// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation.  See file COPYING.
 *
 */

#ifndef CEPH_OBJECTER_H
#define CEPH_OBJECTER_H

#include <condition_variable>
#include <list>
#include <map>
#include <mutex>
#include <memory>
#include <sstream>
#include <string>
#include <string_view>
#include <type_traits>
#include <variant>

#include <boost/container/small_vector.hpp>
#include <boost/asio.hpp>

#include <fmt/format.h>

#include "include/buffer.h"
#include "include/ceph_assert.h"
#include "include/ceph_fs.h"
#include "include/common_fwd.h"
#include "include/expected.hpp"
#include "include/types.h"
#include "include/rados/rados_types.hpp"
#include "include/function2.hpp"
#include "include/neorados/RADOS_Decodable.hpp"

#include "common/admin_socket.h"
#include "common/async/completion.h"
#include "common/ceph_time.h"
#include "common/ceph_mutex.h"
#include "common/ceph_timer.h"
#include "common/config_obs.h"
#include "common/shunique_lock.h"
#include "common/zipkin_trace.h"
#include "common/Throttle.h"

#include "mon/MonClient.h"

#include "messages/MOSDOp.h"
#include "msg/Dispatcher.h"

#include "osd/OSDMap.h"

class Context;
class Messenger;
class MonClient;
class Message;

class MPoolOpReply;

class MGetPoolStatsReply;
class MStatfsReply;
class MCommandReply;
class MWatchNotify;
template <typename T>
struct EnumerationContext;
template <typename t>
struct CB_EnumerateReply;

inline constexpr std::size_t osdc_opvec_len = 2; // osdc op vector len
using osdc_opvec = boost::container::small_vector<OSDOp, osdc_opvec_len>;

// -----------------------------------------

struct ObjectOperation
{
    osdc_opvec ops; // osdc op vector
    int flags = 0;
    int priority = 0; // 优先级

    boost::container::small_vector<ceph::buffer::list *, osdc_opvec_len> out_bl; // 输出缓存队列
    boost::container::small_vector<fu2::unique_function<void(boost::system::error_code, int,
                                                             const ceph::buffer::list &bl) &&>,
                                   osdc_opvec_len>
        out_handler;                                                                    // 回调函数队列
    boost::container::small_vector<int *, osdc_opvec_len> out_rval;                     // 操作函数队列
    boost::container::small_vector<boost::system::error_code *, osdc_opvec_len> out_ec; // boost::system::error_code

    ObjectOperation() = default;
    ObjectOperation(const ObjectOperation &) = delete;
    ObjectOperation &operator=(const ObjectOperation &) = delete;
    ObjectOperation(ObjectOperation &&) = default;
    ObjectOperation &operator=(ObjectOperation &&) = default;
    ~ObjectOperation() = default;

    size_t size() const
    {
        return ops.size();
    }

    void clear()
    {
        ops.clear();
        flags = 0;
        priority = 0;
        out_bl.clear();
        out_handler.clear();
        out_rval.clear();
        out_ec.clear();
    }

    void set_last_op_flags(int flags) // 设置最后一个op的flag
    {
        ceph_assert(!ops.empty());
        ops.rbegin()->op.flags = flags;
    }

    // 设置回调函数
    void set_handler(fu2::unique_function<void(boost::system::error_code, int, const ceph::buffer::list &) &&> f)
    {
        if (f)
        {
            if (out_handler.back())
            {
                // This happens seldom enough that we may as well keep folding
                // functions together when we get another one rather than
                // using a container.
                out_handler.back() =
                    [f = std::move(f),
                     g = std::move(std::move(out_handler.back()))](boost::system::error_code ec, int r,
                                                                   const ceph::buffer::list &bl) mutable {
                        std::move(g)(ec, r, bl);
                        std::move(f)(ec, r, bl);
                    };
            }
            else
            {
                out_handler.back() = std::move(f);
            }
        }
        ceph_assert(ops.size() == out_handler.size());
    }

    // 设置回调函数，调用上一个set_handle
    void set_handler(Context *c)
    {
        if (c)
            set_handler([c = std::unique_ptr<Context>(c)](boost::system::error_code,
                                                          int r,
                                                          const ceph::buffer::list &) mutable {
                c.release()->complete(r);
            });
    }

    // 添加op
    OSDOp &add_op(int op)
    {
        ops.emplace_back();
        ops.back().op.op = op;
        out_bl.push_back(nullptr);
        ceph_assert(ops.size() == out_bl.size());
        out_handler.emplace_back();
        ceph_assert(ops.size() == out_handler.size());
        out_rval.push_back(nullptr);
        ceph_assert(ops.size() == out_rval.size());
        out_ec.push_back(nullptr);
        ceph_assert(ops.size() == out_ec.size());
        return ops.back();
    }

    // 添加data到op
    void add_data(int op, uint64_t off, uint64_t len, ceph::buffer::list &bl)
    {
        OSDOp &osd_op = add_op(op);
        osd_op.op.extent.offset = off;
        osd_op.op.extent.length = len;
        osd_op.indata.claim_append(bl); // 直接将bl追加到indate里
    }

    // 添加data到writesame
    void add_writesame(int op, uint64_t off, uint64_t write_len,
                       ceph::buffer::list &bl)
    {
        OSDOp &osd_op = add_op(op);
        osd_op.op.writesame.offset = off;
        osd_op.op.writesame.length = write_len;
        osd_op.op.writesame.data_length = bl.length();
        osd_op.indata.claim_append(bl);
    }
    void add_xattr(int op, const char *name, const ceph::buffer::list &data)
    {
        OSDOp &osd_op = add_op(op);
        osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
        osd_op.op.xattr.value_len = data.length();
        if (name)
            osd_op.indata.append(name, osd_op.op.xattr.name_len);
        osd_op.indata.append(data);
    }
    void add_xattr_cmp(int op, const char *name, uint8_t cmp_op,
                       uint8_t cmp_mode, const ceph::buffer::list &data)
    {
        OSDOp &osd_op = add_op(op);
        osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
        osd_op.op.xattr.value_len = data.length();
        osd_op.op.xattr.cmp_op = cmp_op;
        osd_op.op.xattr.cmp_mode = cmp_mode;
        if (name)
            osd_op.indata.append(name, osd_op.op.xattr.name_len);
        osd_op.indata.append(data);
    }
    void add_xattr(int op, std::string_view name, const ceph::buffer::list &data)
    {
        OSDOp &osd_op = add_op(op);
        osd_op.op.xattr.name_len = name.size();
        osd_op.op.xattr.value_len = data.length();
        osd_op.indata.append(name.data(), osd_op.op.xattr.name_len);
        osd_op.indata.append(data);
    }
    void add_xattr_cmp(int op, std::string_view name, uint8_t cmp_op,
                       uint8_t cmp_mode, const ceph::buffer::list &data)
    {
        OSDOp &osd_op = add_op(op);
        osd_op.op.xattr.name_len = name.size();
        osd_op.op.xattr.value_len = data.length();
        osd_op.op.xattr.cmp_op = cmp_op;
        osd_op.op.xattr.cmp_mode = cmp_mode;
        if (!name.empty())
            osd_op.indata.append(name.data(), osd_op.op.xattr.name_len);
        osd_op.indata.append(data);
    }
    void add_call(int op, std::string_view cname, std::string_view method,
                  const ceph::buffer::list &indata,
                  ceph::buffer::list *outbl, Context *ctx, int *prval)
    {
        OSDOp &osd_op = add_op(op);

        unsigned p = ops.size() - 1;
        set_handler(ctx);
        out_bl[p] = outbl;
        out_rval[p] = prval;

        osd_op.op.cls.class_len = cname.size();
        osd_op.op.cls.method_len = method.size();
        osd_op.op.cls.indata_len = indata.length();
        osd_op.indata.append(cname.data(), osd_op.op.cls.class_len);
        osd_op.indata.append(method.data(), osd_op.op.cls.method_len);
        osd_op.indata.append(indata);
    }
    void add_call(int op, std::string_view cname, std::string_view method,
                  const ceph::buffer::list &indata,
                  fu2::unique_function<void(boost::system::error_code,
                                            const ceph::buffer::list &) &&>
                      f)
    {
        OSDOp &osd_op = add_op(op);

        set_handler([f = std::move(f)](boost::system::error_code ec,
                                       int,
                                       const ceph::buffer::list &bl) mutable {
            std::move(f)(ec, bl);
        });

        osd_op.op.cls.class_len = cname.size();
        osd_op.op.cls.method_len = method.size();
        osd_op.op.cls.indata_len = indata.length();
        osd_op.indata.append(cname.data(), osd_op.op.cls.class_len);
        osd_op.indata.append(method.data(), osd_op.op.cls.method_len);
        osd_op.indata.append(indata);
    }
    void add_call(int op, std::string_view cname, std::string_view method,
                  const ceph::buffer::list &indata,
                  fu2::unique_function<void(boost::system::error_code, int,
                                            const ceph::buffer::list &) &&>
                      f)
    {
        OSDOp &osd_op = add_op(op);

        set_handler([f = std::move(f)](boost::system::error_code ec,
                                       int r,
                                       const ceph::buffer::list &bl) mutable {
            std::move(f)(ec, r, bl);
        });

        osd_op.op.cls.class_len = cname.size();
        osd_op.op.cls.method_len = method.size();
        osd_op.op.cls.indata_len = indata.length();
        osd_op.indata.append(cname.data(), osd_op.op.cls.class_len);
        osd_op.indata.append(method.data(), osd_op.op.cls.method_len);
        osd_op.indata.append(indata);
    }
    void add_pgls(int op, uint64_t count, collection_list_handle_t cookie,
                  epoch_t start_epoch)
    {
        using ceph::encode;
        OSDOp &osd_op = add_op(op);
        osd_op.op.pgls.count = count;
        osd_op.op.pgls.start_epoch = start_epoch;
        encode(cookie, osd_op.indata);
    }
    void add_pgls_filter(int op, uint64_t count, const ceph::buffer::list &filter,
                         collection_list_handle_t cookie, epoch_t start_epoch)
    {
        using ceph::encode;
        OSDOp &osd_op = add_op(op);
        osd_op.op.pgls.count = count;
        osd_op.op.pgls.start_epoch = start_epoch;
        std::string cname = "pg";
        std::string mname = "filter";
        encode(cname, osd_op.indata);
        encode(mname, osd_op.indata);
        osd_op.indata.append(filter);
        encode(cookie, osd_op.indata);
    }
    void add_alloc_hint(int op, uint64_t expected_object_size,
                        uint64_t expected_write_size,
                        uint32_t flags)
    {
        OSDOp &osd_op = add_op(op);
        osd_op.op.alloc_hint.expected_object_size = expected_object_size;
        osd_op.op.alloc_hint.expected_write_size = expected_write_size;
        osd_op.op.alloc_hint.flags = flags;
    }

    // ------

    // pg
    void pg_ls(uint64_t count, ceph::buffer::list &filter,
               collection_list_handle_t cookie, epoch_t start_epoch)
    {
        if (filter.length() == 0)
            add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch);
        else
            add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie,
                            start_epoch);
        flags |= CEPH_OSD_FLAG_PGOP;
    }

    void pg_nls(uint64_t count, const ceph::buffer::list &filter,
                collection_list_handle_t cookie, epoch_t start_epoch)
    {
        if (filter.length() == 0)
            add_pgls(CEPH_OSD_OP_PGNLS, count, cookie, start_epoch);
        else
            add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie,
                            start_epoch);
        flags |= CEPH_OSD_FLAG_PGOP;
    }

    void scrub_ls(const librados::object_id_t &start_after,
                  uint64_t max_to_get,
                  std::vector<librados::inconsistent_obj_t> *objects,
                  uint32_t *interval,
                  int *rval);
    void scrub_ls(const librados::object_id_t &start_after,
                  uint64_t max_to_get,
                  std::vector<librados::inconsistent_snapset_t> *objects,
                  uint32_t *interval,
                  int *rval);

    void create(bool excl)
    {
        OSDOp &o = add_op(CEPH_OSD_OP_CREATE);
        o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0);
    }

    struct CB_ObjectOperation_stat
    {
        ceph::buffer::list bl;
        uint64_t *psize;
        ceph::real_time *pmtime;
        time_t *ptime;
        struct timespec *pts;
        int *prval;
        boost::system::error_code *pec;
        CB_ObjectOperation_stat(uint64_t *ps, ceph::real_time *pm, time_t *pt, struct timespec *_pts,
                                int *prval, boost::system::error_code *pec)
            : psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval), pec(pec) {}
        void operator()(boost::system::error_code ec, int r, const ceph::buffer::list &bl)
        {
            using ceph::decode;
            if (r >= 0)
            {
                auto p = bl.cbegin();
                try
                {
                    uint64_t size;
                    ceph::real_time mtime;
                    decode(size, p);
                    decode(mtime, p);
                    if (psize)
                        *psize = size;
                    if (pmtime)
                        *pmtime = mtime;
                    if (ptime)
                        *ptime = ceph::real_clock::to_time_t(mtime);
                    if (pts)
                        *pts = ceph::real_clock::to_timespec(mtime);
                }
                catch (const ceph::buffer::error &e)
                {
                    if (prval)
                        *prval = -EIO;
                    if (pec)
                        *pec = e.code();
                }
            }
        }
    };
    void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval)
    {
        add_op(CEPH_OSD_OP_STAT);
        set_handler(CB_ObjectOperation_stat(psize, pmtime, nullptr, nullptr, prval,
                                            nullptr));
        out_rval.back() = prval;
    }
    void stat(uint64_t *psize, ceph::real_time *pmtime,
              boost::system::error_code *ec)
    {
        add_op(CEPH_OSD_OP_STAT);
        set_handler(CB_ObjectOperation_stat(psize, pmtime, nullptr, nullptr,
                                            nullptr, ec));
        out_ec.back() = ec;
    }
    void stat(uint64_t *psize, time_t *ptime, int *prval)
    {
        add_op(CEPH_OSD_OP_STAT);
        set_handler(CB_ObjectOperation_stat(psize, nullptr, ptime, nullptr, prval,
                                            nullptr));
        out_rval.back() = prval;
    }
    void stat(uint64_t *psize, struct timespec *pts, int *prval)
    {
        add_op(CEPH_OSD_OP_STAT);
        set_handler(CB_ObjectOperation_stat(psize, nullptr, nullptr, pts, prval, nullptr));
        out_rval.back() = prval;
    }
    void stat(uint64_t *psize, ceph::real_time *pmtime, nullptr_t)
    {
        add_op(CEPH_OSD_OP_STAT);
        set_handler(CB_ObjectOperation_stat(psize, pmtime, nullptr, nullptr, nullptr,
                                            nullptr));
    }
    void stat(uint64_t *psize, time_t *ptime, nullptr_t)
    {
        add_op(CEPH_OSD_OP_STAT);
        set_handler(CB_ObjectOperation_stat(psize, nullptr, ptime, nullptr, nullptr,
                                            nullptr));
    }
    void stat(uint64_t *psize, struct timespec *pts, nullptr_t)
    {
        add_op(CEPH_OSD_OP_STAT);
        set_handler(CB_ObjectOperation_stat(psize, nullptr, nullptr, pts, nullptr,
                                            nullptr));
    }
    void stat(uint64_t *psize, nullptr_t, nullptr_t)
    {
        add_op(CEPH_OSD_OP_STAT);
        set_handler(CB_ObjectOperation_stat(psize, nullptr, nullptr, nullptr,
                                            nullptr, nullptr));
    }

    // object cmpext
    struct CB_ObjectOperation_cmpext
    {
        int *prval = nullptr;
        boost::system::error_code *ec = nullptr;
        std::size_t *s = nullptr;
        explicit CB_ObjectOperation_cmpext(int *prval)
            : prval(prval) {}
        CB_ObjectOperation_cmpext(boost::system::error_code *ec, std::size_t *s)
            : ec(ec), s(s) {}

        void operator()(boost::system::error_code ec, int r, const ceph::buffer::list &)
        {
            if (prval)
                *prval = r;
            if (this->ec)
                *this->ec = ec;
            if (s)
                *s = static_cast<std::size_t>(-(MAX_ERRNO - r));
        }
    };

    void cmpext(uint64_t off, ceph::buffer::list &cmp_bl, int *prval)
    {
        add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl);
        set_handler(CB_ObjectOperation_cmpext(prval));
        out_rval.back() = prval;
    }

    void cmpext(uint64_t off, ceph::buffer::list &&cmp_bl, boost::system::error_code *ec,
                std::size_t *s)
    {
        add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl);
        set_handler(CB_ObjectOperation_cmpext(ec, s));
        out_ec.back() = ec;
    }

    // Used by C API
    void cmpext(uint64_t off, uint64_t cmp_len, const char *cmp_buf, int *prval)
    {
        ceph::buffer::list cmp_bl;
        cmp_bl.append(cmp_buf, cmp_len);
        add_data(CEPH_OSD_OP_CMPEXT, off, cmp_len, cmp_bl);
        set_handler(CB_ObjectOperation_cmpext(prval));
        out_rval.back() = prval;
    }

    void read(uint64_t off, uint64_t len, ceph::buffer::list *pbl, int *prval,
              Context *ctx)
    {
        ceph::buffer::list bl;
        add_data(CEPH_OSD_OP_READ, off, len, bl);
        unsigned p = ops.size() - 1;
        out_bl[p] = pbl;
        out_rval[p] = prval;
        set_handler(ctx);
    }

    void read(uint64_t off, uint64_t len, boost::system::error_code *ec,
              ceph::buffer::list *pbl)
    {
        ceph::buffer::list bl;
        add_data(CEPH_OSD_OP_READ, off, len, bl);
        out_ec.back() = ec;
        out_bl.back() = pbl;
    }

    template <typename Ex>
    struct CB_ObjectOperation_sparse_read
    {
        ceph::buffer::list *data_bl;
        Ex *extents;
        int *prval;
        boost::system::error_code *pec;
        CB_ObjectOperation_sparse_read(ceph::buffer::list *data_bl,
                                       Ex *extents,
                                       int *prval,
                                       boost::system::error_code *pec)
            : data_bl(data_bl), extents(extents), prval(prval), pec(pec) {}
        void operator()(boost::system::error_code ec, int r, const ceph::buffer::list &bl)
        {
            auto iter = bl.cbegin();
            if (r >= 0)
            {
                // NOTE: it's possible the sub-op has not been executed but the result
                // code remains zeroed. Avoid the costly exception handling on a
                // potential IO path.
                if (bl.length() > 0)
                {
                    try
                    {
                        decode(*extents, iter);
                        decode(*data_bl, iter);
                    }
                    catch (const ceph::buffer::error &e)
                    {
                        if (prval)
                            *prval = -EIO;
                        if (pec)
                            *pec = e.code();
                    }
                }
                else if (prval)
                {
                    *prval = -EIO;
                    if (pec)
                        *pec = buffer::errc::end_of_buffer;
                }
            }
        }
    };
    void sparse_read(uint64_t off, uint64_t len, std::map<uint64_t, uint64_t> *m,
                     ceph::buffer::list *data_bl, int *prval)
    {
        ceph::buffer::list bl;
        add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
        set_handler(CB_ObjectOperation_sparse_read(data_bl, m, prval, nullptr));
        out_rval.back() = prval;
    }
    void sparse_read(uint64_t off, uint64_t len,
                     boost::system::error_code *ec,
                     std::vector<std::pair<uint64_t, uint64_t>> *m,
                     ceph::buffer::list *data_bl)
    {
        ceph::buffer::list bl;
        add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
        set_handler(CB_ObjectOperation_sparse_read(data_bl, m, nullptr, ec));
        out_ec.back() = ec;
    }
    void write(uint64_t off, ceph::buffer::list &bl,
               uint64_t truncate_size,
               uint32_t truncate_seq)
    {
        add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl); // 向op添加data
        OSDOp &o = *ops.rbegin();
        o.op.extent.truncate_size = truncate_size;
        o.op.extent.truncate_seq = truncate_seq;
    }
    void write(uint64_t off, ceph::buffer::list &bl)
    {
        write(off, bl, 0, 0);
    }
    void write_full(ceph::buffer::list &bl)
    {
        add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl);
    }
    void writesame(uint64_t off, uint64_t write_len, ceph::buffer::list &bl)
    {
        add_writesame(CEPH_OSD_OP_WRITESAME, off, write_len, bl);
    }
    void append(ceph::buffer::list &bl)
    {
        add_data(CEPH_OSD_OP_APPEND, 0, bl.length(), bl);
    }
    void zero(uint64_t off, uint64_t len)
    {
        ceph::buffer::list bl;
        add_data(CEPH_OSD_OP_ZERO, off, len, bl);
    }
    void truncate(uint64_t off)
    {
        ceph::buffer::list bl;
        add_data(CEPH_OSD_OP_TRUNCATE, off, 0, bl);
    }
    void remove()
    {
        ceph::buffer::list bl;
        add_data(CEPH_OSD_OP_DELETE, 0, 0, bl);
    }
    void mapext(uint64_t off, uint64_t len)
    {
        ceph::buffer::list bl;
        add_data(CEPH_OSD_OP_MAPEXT, off, len, bl);
    }
    void sparse_read(uint64_t off, uint64_t len)
    {
        ceph::buffer::list bl;
        add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
    }

    void checksum(uint8_t type, const ceph::buffer::list &init_value_bl,
                  uint64_t off, uint64_t len, size_t chunk_size,
                  ceph::buffer::list *pbl, int *prval, Context *ctx)
    {
        OSDOp &osd_op = add_op(CEPH_OSD_OP_CHECKSUM);
        osd_op.op.checksum.offset = off;
        osd_op.op.checksum.length = len;
        osd_op.op.checksum.type = type;
        osd_op.op.checksum.chunk_size = chunk_size;
        osd_op.indata.append(init_value_bl);

        unsigned p = ops.size() - 1;
        out_bl[p] = pbl;
        out_rval[p] = prval;
        set_handler(ctx);
    }

    // object attrs
    void getxattr(const char *name, ceph::buffer::list *pbl, int *prval)
    {
        ceph::buffer::list bl;
        add_xattr(CEPH_OSD_OP_GETXATTR, name, bl);
        unsigned p = ops.size() - 1;
        out_bl[p] = pbl;
        out_rval[p] = prval;
    }
    void getxattr(std::string_view name, boost::system::error_code *ec,
                  buffer::list *pbl)
    {
        ceph::buffer::list bl;
        add_xattr(CEPH_OSD_OP_GETXATTR, name, bl);
        out_bl.back() = pbl;
        out_ec.back() = ec;
    }

    template <typename Vals>
    struct CB_ObjectOperation_decodevals
    {
        uint64_t max_entries;
        Vals *pattrs;
        bool *ptruncated;
        int *prval;
        boost::system::error_code *pec;
        CB_ObjectOperation_decodevals(uint64_t m, Vals *pa,
                                      bool *pt, int *pr,
                                      boost::system::error_code *pec)
            : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr), pec(pec)
        {
            if (ptruncated)
            {
                *ptruncated = false;
            }
        }
        void operator()(boost::system::error_code ec, int r, const ceph::buffer::list &bl)
        {
            if (r >= 0)
            {
                auto p = bl.cbegin();
                try
                {
                    if (pattrs)
                        decode(*pattrs, p);
                    if (ptruncated)
                    {
                        Vals ignore;
                        if (!pattrs)
                        {
                            decode(ignore, p);
                            pattrs = &ignore;
                        }
                        if (!p.end())
                        {
                            decode(*ptruncated, p);
                        }
                        else
                        {
                            // The OSD did not provide this.  Since old OSDs do not
                            // enfoce omap result limits either, we can infer it from
                            // the size of the result
                            *ptruncated = (pattrs->size() == max_entries);
                        }
                    }
                }
                catch (const ceph::buffer::error &e)
                {
                    if (prval)
                        *prval = -EIO;
                    if (pec)
                        *pec = e.code();
                }
            }
        }
    };
    template <typename Keys>
    struct CB_ObjectOperation_decodekeys
    {
        uint64_t max_entries;
        Keys *pattrs;
        bool *ptruncated;
        int *prval;
        boost::system::error_code *pec;
        CB_ObjectOperation_decodekeys(uint64_t m, Keys *pa, bool *pt,
                                      int *pr, boost::system::error_code *pec)
            : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr), pec(pec)
        {
            if (ptruncated)
            {
                *ptruncated = false;
            }
        }
        void operator()(boost::system::error_code ec, int r, const ceph::buffer::list &bl)
        {
            if (r >= 0)
            {
                using ceph::decode;
                auto p = bl.cbegin();
                try
                {
                    if (pattrs)
                        decode(*pattrs, p);
                    if (ptruncated)
                    {
                        Keys ignore;
                        if (!pattrs)
                        {
                            decode(ignore, p);
                            pattrs = &ignore;
                        }
                        if (!p.end())
                        {
                            decode(*ptruncated, p);
                        }
                        else
                        {
                            // the OSD did not provide this.  since old OSDs do not
                            // enforce omap result limits either, we can infer it from
                            // the size of the result
                            *ptruncated = (pattrs->size() == max_entries);
                        }
                    }
                }
                catch (const ceph::buffer::error &e)
                {
                    if (prval)
                        *prval = -EIO;
                    if (pec)
                        *pec = e.code();
                }
            }
        }
    };
    struct CB_ObjectOperation_decodewatchers
    {
        std::list<obj_watch_t> *pwatchers;
        int *prval;
        boost::system::error_code *pec;
        CB_ObjectOperation_decodewatchers(std::list<obj_watch_t> *pw, int *pr,
                                          boost::system::error_code *pec)
            : pwatchers(pw), prval(pr), pec(pec) {}
        void operator()(boost::system::error_code ec, int r,
                        const ceph::buffer::list &bl)
        {
            if (r >= 0)
            {
                auto p = bl.cbegin();
                try
                {
                    obj_list_watch_response_t resp;
                    decode(resp, p);
                    if (pwatchers)
                    {
                        for (const auto &watch_item : resp.entries)
                        {
                            obj_watch_t ow;
                            std::string sa = watch_item.addr.get_legacy_str();
                            strncpy(ow.addr, sa.c_str(), sizeof(ow.addr) - 1);
                            ow.addr[sizeof(ow.addr) - 1] = '\0';
                            ow.watcher_id = watch_item.name.num();
                            ow.cookie = watch_item.cookie;
                            ow.timeout_seconds = watch_item.timeout_seconds;
                            pwatchers->push_back(std::move(ow));
                        }
                    }
                }
                catch (const ceph::buffer::error &e)
                {
                    if (prval)
                        *prval = -EIO;
                    if (pec)
                        *pec = e.code();
                }
            }
        }
    };

    struct CB_ObjectOperation_decodewatchersneo
    {
        std::vector<neorados::ObjWatcher> *pwatchers;
        int *prval;
        boost::system::error_code *pec;
        CB_ObjectOperation_decodewatchersneo(std::vector<neorados::ObjWatcher> *pw,
                                             int *pr,
                                             boost::system::error_code *pec)
            : pwatchers(pw), prval(pr), pec(pec) {}
        void operator()(boost::system::error_code ec, int r,
                        const ceph::buffer::list &bl)
        {
            if (r >= 0)
            {
                auto p = bl.cbegin();
                try
                {
                    obj_list_watch_response_t resp;
                    decode(resp, p);
                    if (pwatchers)
                    {
                        for (const auto &watch_item : resp.entries)
                        {
                            neorados::ObjWatcher ow;
                            ow.addr = watch_item.addr.get_legacy_str();
                            ow.watcher_id = watch_item.name.num();
                            ow.cookie = watch_item.cookie;
                            ow.timeout_seconds = watch_item.timeout_seconds;
                            pwatchers->push_back(std::move(ow));
                        }
                    }
                }
                catch (const ceph::buffer::error &e)
                {
                    if (prval)
                        *prval = -EIO;
                    if (pec)
                        *pec = e.code();
                }
            }
        }
    };

    struct CB_ObjectOperation_decodesnaps
    {
        librados::snap_set_t *psnaps;
        neorados::SnapSet *neosnaps;
        int *prval;
        boost::system::error_code *pec;
        CB_ObjectOperation_decodesnaps(librados::snap_set_t *ps,
                                       neorados::SnapSet *ns, int *pr,
                                       boost::system::error_code *pec)
            : psnaps(ps), neosnaps(ns), prval(pr), pec(pec) {}
        void operator()(boost::system::error_code ec, int r, const ceph::buffer::list &bl)
        {
            if (r >= 0)
            {
                using ceph::decode;
                auto p = bl.cbegin();
                try
                {
                    obj_list_snap_response_t resp;
                    decode(resp, p);
                    if (psnaps)
                    {
                        psnaps->clones.clear();
                        for (auto ci = resp.clones.begin();
                             ci != resp.clones.end();
                             ++ci)
                        {
                            librados::clone_info_t clone;

                            clone.cloneid = ci->cloneid;
                            clone.snaps.reserve(ci->snaps.size());
                            clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(),
                                               ci->snaps.end());
                            clone.overlap = ci->overlap;
                            clone.size = ci->size;

                            psnaps->clones.push_back(clone);
                        }
                        psnaps->seq = resp.seq;
                    }

                    if (neosnaps)
                    {
                        neosnaps->clones.clear();
                        for (auto &&c : resp.clones)
                        {
                            neorados::CloneInfo clone;

                            clone.cloneid = std::move(c.cloneid);
                            clone.snaps.reserve(c.snaps.size());
                            std::move(c.snaps.begin(), c.snaps.end(),
                                      std::back_inserter(clone.snaps));
                            clone.overlap = c.overlap;
                            clone.size = c.size;
                            neosnaps->clones.push_back(std::move(clone));
                        }
                        neosnaps->seq = resp.seq;
                    }
                }
                catch (const ceph::buffer::error &e)
                {
                    if (prval)
                        *prval = -EIO;
                    if (pec)
                        *pec = e.code();
                }
            }
        }
    };
    void getxattrs(std::map<std::string, ceph::buffer::list> *pattrs, int *prval)
    {
        add_op(CEPH_OSD_OP_GETXATTRS);
        if (pattrs || prval)
        {
            set_handler(CB_ObjectOperation_decodevals(0, pattrs, nullptr, prval,
                                                      nullptr));
            out_rval.back() = prval;
        }
    }
    void getxattrs(boost::system::error_code *ec,
                   boost::container::flat_map<std::string, ceph::buffer::list> *pattrs)
    {
        add_op(CEPH_OSD_OP_GETXATTRS);
        set_handler(CB_ObjectOperation_decodevals(0, pattrs, nullptr, nullptr, ec));
        out_ec.back() = ec;
    }
    void setxattr(const char *name, const ceph::buffer::list &bl)
    {
        add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
    }
    void setxattr(std::string_view name, const ceph::buffer::list &bl)
    {
        add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
    }
    void setxattr(const char *name, const std::string &s)
    {
        ceph::buffer::list bl;
        bl.append(s);
        add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
    }
    void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode,
                  const ceph::buffer::list &bl)
    {
        add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
    }
    void cmpxattr(std::string_view name, uint8_t cmp_op, uint8_t cmp_mode,
                  const ceph::buffer::list &bl)
    {
        add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
    }
    void rmxattr(const char *name)
    {
        ceph::buffer::list bl;
        add_xattr(CEPH_OSD_OP_RMXATTR, name, bl);
    }
    void rmxattr(std::string_view name)
    {
        ceph::buffer::list bl;
        add_xattr(CEPH_OSD_OP_RMXATTR, name, bl);
    }
    void setxattrs(map<string, ceph::buffer::list> &attrs)
    {
        using ceph::encode;
        ceph::buffer::list bl;
        encode(attrs, bl);
        add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
    }
    void resetxattrs(const char *prefix, std::map<std::string, ceph::buffer::list> &attrs)
    {
        using ceph::encode;
        ceph::buffer::list bl;
        encode(attrs, bl);
        add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl);
    }

    // trivialmap
    void tmap_update(ceph::buffer::list &bl)
    {
        add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl);
    }

    // objectmap
    void omap_get_keys(const std::string &start_after,
                       uint64_t max_to_get,
                       std::set<std::string> *out_set,
                       bool *ptruncated,
                       int *prval)
    {
        using ceph::encode;
        OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
        ceph::buffer::list bl;
        encode(start_after, bl);
        encode(max_to_get, bl);
        op.op.extent.offset = 0;
        op.op.extent.length = bl.length();
        op.indata.claim_append(bl);
        if (prval || ptruncated || out_set)
        {
            set_handler(CB_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, prval,
                                                      nullptr));
            out_rval.back() = prval;
        }
    }
    void omap_get_keys(std::optional<std::string_view> start_after,
                       uint64_t max_to_get,
                       boost::system::error_code *ec,
                       boost::container::flat_set<std::string> *out_set,
                       bool *ptruncated)
    {
        OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
        ceph::buffer::list bl;
        encode(start_after ? *start_after : std::string_view{}, bl);
        encode(max_to_get, bl);
        op.op.extent.offset = 0;
        op.op.extent.length = bl.length();
        op.indata.claim_append(bl);
        set_handler(
            CB_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, nullptr,
                                          ec));
        out_ec.back() = ec;
    }

    void omap_get_vals(const std::string &start_after,
                       const std::string &filter_prefix,
                       uint64_t max_to_get,
                       std::map<std::string, ceph::buffer::list> *out_set,
                       bool *ptruncated,
                       int *prval)
    {
        using ceph::encode;
        OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
        ceph::buffer::list bl;
        encode(start_after, bl);
        encode(max_to_get, bl);
        encode(filter_prefix, bl);
        op.op.extent.offset = 0;
        op.op.extent.length = bl.length();
        op.indata.claim_append(bl);
        if (prval || out_set || ptruncated)
        {
            set_handler(CB_ObjectOperation_decodevals(max_to_get, out_set, ptruncated,
                                                      prval, nullptr));
            out_rval.back() = prval;
        }
    }

    void omap_get_vals(std::optional<std::string_view> start_after,
                       std::optional<std::string_view> filter_prefix,
                       uint64_t max_to_get,
                       boost::system::error_code *ec,
                       boost::container::flat_map<std::string, ceph::buffer::list> *out_set,
                       bool *ptruncated)
    {
        OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
        ceph::buffer::list bl;
        encode(start_after ? *start_after : std::string_view{}, bl);
        encode(max_to_get, bl);
        encode(filter_prefix ? *start_after : std::string_view{}, bl);
        op.op.extent.offset = 0;
        op.op.extent.length = bl.length();
        op.indata.claim_append(bl);
        set_handler(CB_ObjectOperation_decodevals(max_to_get, out_set, ptruncated,
                                                  nullptr, ec));
        out_ec.back() = ec;
    }

    void omap_get_vals_by_keys(const std::set<std::string> &to_get,
                               std::map<std::string, ceph::buffer::list> *out_set,
                               int *prval)
    {
        OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
        ceph::buffer::list bl;
        encode(to_get, bl);
        op.op.extent.offset = 0;
        op.op.extent.length = bl.length();
        op.indata.claim_append(bl);
        if (prval || out_set)
        {
            set_handler(CB_ObjectOperation_decodevals(0, out_set, nullptr, prval,
                                                      nullptr));
            out_rval.back() = prval;
        }
    }

    void omap_get_vals_by_keys(
        const boost::container::flat_set<std::string> &to_get,
        boost::system::error_code *ec,
        boost::container::flat_map<std::string, ceph::buffer::list> *out_set)
    {
        OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
        ceph::buffer::list bl;
        encode(to_get, bl);
        op.op.extent.offset = 0;
        op.op.extent.length = bl.length();
        op.indata.claim_append(bl);
        set_handler(CB_ObjectOperation_decodevals(0, out_set, nullptr, nullptr,
                                                  ec));
        out_ec.back() = ec;
    }

    void omap_cmp(const std::map<std::string, pair<ceph::buffer::list, int>> &assertions,
                  int *prval)
    {
        using ceph::encode;
        OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
        ceph::buffer::list bl;
        encode(assertions, bl);
        op.op.extent.offset = 0;
        op.op.extent.length = bl.length();
        op.indata.claim_append(bl);
        if (prval)
        {
            unsigned p = ops.size() - 1;
            out_rval[p] = prval;
        }
    }

    void omap_cmp(const boost::container::flat_map<
                      std::string, pair<ceph::buffer::list, int>> &assertions,
                  boost::system::error_code *ec)
    {
        OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
        ceph::buffer::list bl;
        encode(assertions, bl);
        op.op.extent.offset = 0;
        op.op.extent.length = bl.length();
        op.indata.claim_append(bl);
        out_ec.back() = ec;
    }

    struct C_ObjectOperation_copyget : public Context
    {
        ceph::buffer::list bl;
        object_copy_cursor_t *cursor;
        uint64_t *out_size;
        ceph::real_time *out_mtime;
        std::map<std::string, ceph::buffer::list> *out_attrs;
        ceph::buffer::list *out_data, *out_omap_header, *out_omap_data;
        std::vector<snapid_t> *out_snaps;
        snapid_t *out_snap_seq;
        uint32_t *out_flags;
        uint32_t *out_data_digest;
        uint32_t *out_omap_digest;
        mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t>> *out_reqids;
        mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes;
        uint64_t *out_truncate_seq;
        uint64_t *out_truncate_size;
        int *prval;
        C_ObjectOperation_copyget(object_copy_cursor_t *c,
                                  uint64_t *s,
                                  ceph::real_time *m,
                                  std::map<std::string, ceph::buffer::list> *a,
                                  ceph::buffer::list *d, ceph::buffer::list *oh,
                                  ceph::buffer::list *o,
                                  std::vector<snapid_t> *osnaps,
                                  snapid_t *osnap_seq,
                                  uint32_t *flags,
                                  uint32_t *dd,
                                  uint32_t *od,
                                  mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t>> *oreqids,
                                  mempool::osd_pglog::map<uint32_t, int> *oreqid_return_codes,
                                  uint64_t *otseq,
                                  uint64_t *otsize,
                                  int *r)
            : cursor(c),
              out_size(s), out_mtime(m),
              out_attrs(a), out_data(d), out_omap_header(oh),
              out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq),
              out_flags(flags), out_data_digest(dd), out_omap_digest(od),
              out_reqids(oreqids),
              out_reqid_return_codes(oreqid_return_codes),
              out_truncate_seq(otseq),
              out_truncate_size(otsize),
              prval(r) {}
        void finish(int r) override
        {
            using ceph::decode;
            // reqids are copied on ENOENT
            if (r < 0 && r != -ENOENT)
                return;
            try
            {
                auto p = bl.cbegin();
                object_copy_data_t copy_reply;
                decode(copy_reply, p);
                if (r == -ENOENT)
                {
                    if (out_reqids)
                        *out_reqids = copy_reply.reqids;
                    return;
                }
                if (out_size)
                    *out_size = copy_reply.size;
                if (out_mtime)
                    *out_mtime = ceph::real_clock::from_ceph_timespec(copy_reply.mtime);
                if (out_attrs)
                    *out_attrs = copy_reply.attrs;
                if (out_data)
                    out_data->claim_append(copy_reply.data);
                if (out_omap_header)
                    out_omap_header->claim_append(copy_reply.omap_header);
                if (out_omap_data)
                    *out_omap_data = copy_reply.omap_data;
                if (out_snaps)
                    *out_snaps = copy_reply.snaps;
                if (out_snap_seq)
                    *out_snap_seq = copy_reply.snap_seq;
                if (out_flags)
                    *out_flags = copy_reply.flags;
                if (out_data_digest)
                    *out_data_digest = copy_reply.data_digest;
                if (out_omap_digest)
                    *out_omap_digest = copy_reply.omap_digest;
                if (out_reqids)
                    *out_reqids = copy_reply.reqids;
                if (out_reqid_return_codes)
                    *out_reqid_return_codes = copy_reply.reqid_return_codes;
                if (out_truncate_seq)
                    *out_truncate_seq = copy_reply.truncate_seq;
                if (out_truncate_size)
                    *out_truncate_size = copy_reply.truncate_size;
                *cursor = copy_reply.cursor;
            }
            catch (const ceph::buffer::error &e)
            {
                if (prval)
                    *prval = -EIO;
            }
        }
    };

    void copy_get(object_copy_cursor_t *cursor,
                  uint64_t max,
                  uint64_t *out_size,
                  ceph::real_time *out_mtime,
                  std::map<std::string, ceph::buffer::list> *out_attrs,
                  ceph::buffer::list *out_data,
                  ceph::buffer::list *out_omap_header,
                  ceph::buffer::list *out_omap_data,
                  std::vector<snapid_t> *out_snaps,
                  snapid_t *out_snap_seq,
                  uint32_t *out_flags,
                  uint32_t *out_data_digest,
                  uint32_t *out_omap_digest,
                  mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t>> *out_reqids,
                  mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes,
                  uint64_t *truncate_seq,
                  uint64_t *truncate_size,
                  int *prval)
    {
        using ceph::encode;
        OSDOp &osd_op = add_op(CEPH_OSD_OP_COPY_GET);
        osd_op.op.copy_get.max = max;
        encode(*cursor, osd_op.indata);
        encode(max, osd_op.indata);
        unsigned p = ops.size() - 1;
        out_rval[p] = prval;
        C_ObjectOperation_copyget *h =
            new C_ObjectOperation_copyget(cursor, out_size, out_mtime,
                                          out_attrs, out_data, out_omap_header,
                                          out_omap_data, out_snaps, out_snap_seq,
                                          out_flags, out_data_digest,
                                          out_omap_digest, out_reqids,
                                          out_reqid_return_codes, truncate_seq,
                                          truncate_size, prval);
        out_bl[p] = &h->bl;
        set_handler(h);
    }

    void undirty()
    {
        add_op(CEPH_OSD_OP_UNDIRTY);
    }

    struct C_ObjectOperation_isdirty : public Context
    {
        ceph::buffer::list bl;
        bool *pisdirty;
        int *prval;
        C_ObjectOperation_isdirty(bool *p, int *r)
            : pisdirty(p), prval(r) {}
        void finish(int r) override
        {
            using ceph::decode;
            if (r < 0)
                return;
            try
            {
                auto p = bl.cbegin();
                bool isdirty;
                decode(isdirty, p);
                if (pisdirty)
                    *pisdirty = isdirty;
            }
            catch (const ceph::buffer::error &e)
            {
                if (prval)
                    *prval = -EIO;
            }
        }
    };

    void is_dirty(bool *pisdirty, int *prval)
    {
        add_op(CEPH_OSD_OP_ISDIRTY);
        unsigned p = ops.size() - 1;
        out_rval[p] = prval;
        C_ObjectOperation_isdirty *h =
            new C_ObjectOperation_isdirty(pisdirty, prval);
        out_bl[p] = &h->bl;
        set_handler(h);
    }

    struct C_ObjectOperation_hit_set_ls : public Context
    {
        ceph::buffer::list bl;
        std::list<std::pair<time_t, time_t>> *ptls;
        std::list<std::pair<ceph::real_time, ceph::real_time>> *putls;
        int *prval;
        C_ObjectOperation_hit_set_ls(std::list<std::pair<time_t, time_t>> *t,
                                     std::list<std::pair<ceph::real_time,
                                                         ceph::real_time>> *ut,
                                     int *r)
            : ptls(t), putls(ut), prval(r) {}
        void finish(int r) override
        {
            using ceph::decode;
            if (r < 0)
                return;
            try
            {
                auto p = bl.cbegin();
                std::list<std::pair<ceph::real_time, ceph::real_time>> ls;
                decode(ls, p);
                if (ptls)
                {
                    ptls->clear();
                    for (auto p = ls.begin(); p != ls.end(); ++p)
                        // round initial timestamp up to the next full second to
                        // keep this a valid interval.
                        ptls->push_back(
                            std::make_pair(ceph::real_clock::to_time_t(
                                               ceph::ceil(p->first,
                                                          // Sadly, no time literals until C++14.
                                                          std::chrono::seconds(1))),
                                           ceph::real_clock::to_time_t(p->second)));
                }
                if (putls)
                    putls->swap(ls);
            }
            catch (const ceph::buffer::error &e)
            {
                r = -EIO;
            }
            if (prval)
                *prval = r;
        }
    };

    /**
     * std::list available HitSets.
     *
     * We will get back a std::list of time intervals.  Note that the most
     * recent range may have an empty end timestamp if it is still
     * accumulating.
     *
     * @param pls [out] std::list of time intervals
     * @param prval [out] return value
     */
    void hit_set_ls(std::list<std::pair<time_t, time_t>> *pls, int *prval)
    {
        add_op(CEPH_OSD_OP_PG_HITSET_LS);
        unsigned p = ops.size() - 1;
        out_rval[p] = prval;
        C_ObjectOperation_hit_set_ls *h =
            new C_ObjectOperation_hit_set_ls(pls, NULL, prval);
        out_bl[p] = &h->bl;
        set_handler(h);
    }
    void hit_set_ls(std::list<std::pair<ceph::real_time, ceph::real_time>> *pls,
                    int *prval)
    {
        add_op(CEPH_OSD_OP_PG_HITSET_LS);
        unsigned p = ops.size() - 1;
        out_rval[p] = prval;
        C_ObjectOperation_hit_set_ls *h =
            new C_ObjectOperation_hit_set_ls(NULL, pls, prval);
        out_bl[p] = &h->bl;
        set_handler(h);
    }

    /**
     * get HitSet
     *
     * Return an encoded HitSet that includes the provided time
     * interval.
     *
     * @param stamp [in] timestamp
     * @param pbl [out] target buffer for encoded HitSet
     * @param prval [out] return value
     */
    void hit_set_get(ceph::real_time stamp, ceph::buffer::list *pbl, int *prval)
    {
        OSDOp &op = add_op(CEPH_OSD_OP_PG_HITSET_GET);
        op.op.hit_set_get.stamp = ceph::real_clock::to_ceph_timespec(stamp);
        unsigned p = ops.size() - 1;
        out_rval[p] = prval;
        out_bl[p] = pbl;
    }

    void omap_get_header(ceph::buffer::list *bl, int *prval)
    {
        add_op(CEPH_OSD_OP_OMAPGETHEADER);
        unsigned p = ops.size() - 1;
        out_bl[p] = bl;
        out_rval[p] = prval;
    }

    void omap_get_header(boost::system::error_code *ec, ceph::buffer::list *bl)
    {
        add_op(CEPH_OSD_OP_OMAPGETHEADER);
        out_bl.back() = bl;
        out_ec.back() = ec;
    }

    void omap_set(const map<string, ceph::buffer::list> &map)
    {
        ceph::buffer::list bl;
        encode(map, bl);
        add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
    }

    void omap_set(const boost::container::flat_map<string, ceph::buffer::list> &map)
    {
        ceph::buffer::list bl;
        encode(map, bl);
        add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
    }

    void omap_set_header(ceph::buffer::list &bl)
    {
        add_data(CEPH_OSD_OP_OMAPSETHEADER, 0, bl.length(), bl);
    }

    void omap_clear()
    {
        add_op(CEPH_OSD_OP_OMAPCLEAR);
    }

    void omap_rm_keys(const std::set<std::string> &to_remove)
    {
        using ceph::encode;
        ceph::buffer::list bl;
        encode(to_remove, bl);
        add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
    }
    void omap_rm_keys(const boost::container::flat_set<std::string> &to_remove)
    {
        ceph::buffer::list bl;
        encode(to_remove, bl);
        add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
    }

    void omap_rm_range(std::string_view key_begin, std::string_view key_end)
    {
        ceph::buffer::list bl;
        using ceph::encode;
        encode(key_begin, bl);
        encode(key_end, bl);
        add_data(CEPH_OSD_OP_OMAPRMKEYRANGE, 0, bl.length(), bl);
    }

    // object classes
    void call(const char *cname, const char *method, ceph::buffer::list &indata)
    {
        add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
    }

    void call(const char *cname, const char *method, ceph::buffer::list &indata,
              ceph::buffer::list *outdata, Context *ctx, int *prval)
    {
        add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, ctx, prval);
    }

    void call(std::string_view cname, std::string_view method,
              const ceph::buffer::list &indata, boost::system::error_code *ec)
    {
        add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
        out_ec.back() = ec;
    }

    void call(std::string_view cname, std::string_view method, const ceph::buffer::list &indata,
              boost::system::error_code *ec, ceph::buffer::list *outdata)
    {
        add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, nullptr, nullptr);
        out_ec.back() = ec;
    }
    void call(std::string_view cname, std::string_view method,
              const ceph::buffer::list &indata,
              fu2::unique_function<void(boost::system::error_code,
                                        const ceph::buffer::list &) &&>
                  f)
    {
        add_call(CEPH_OSD_OP_CALL, cname, method, indata, std::move(f));
    }
    void call(std::string_view cname, std::string_view method,
              const ceph::buffer::list &indata,
              fu2::unique_function<void(boost::system::error_code, int,
                                        const ceph::buffer::list &) &&>
                  f)
    {
        add_call(CEPH_OSD_OP_CALL, cname, method, indata, std::move(f));
    }

    // watch/notify
    void watch(uint64_t cookie, __u8 op, uint32_t timeout = 0)
    {
        OSDOp &osd_op = add_op(CEPH_OSD_OP_WATCH);
        osd_op.op.watch.cookie = cookie;
        osd_op.op.watch.op = op;
        osd_op.op.watch.timeout = timeout;
    }

    void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout,
                ceph::buffer::list &bl, ceph::buffer::list *inbl)
    {
        using ceph::encode;
        OSDOp &osd_op = add_op(CEPH_OSD_OP_NOTIFY);
        osd_op.op.notify.cookie = cookie;
        encode(prot_ver, *inbl);
        encode(timeout, *inbl);
        encode(bl, *inbl);
        osd_op.indata.append(*inbl);
    }

    void notify_ack(uint64_t notify_id, uint64_t cookie,
                    ceph::buffer::list &reply_bl)
    {
        using ceph::encode;
        OSDOp &osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK);
        ceph::buffer::list bl;
        encode(notify_id, bl);
        encode(cookie, bl);
        encode(reply_bl, bl);
        osd_op.indata.append(bl);
    }

    void list_watchers(std::list<obj_watch_t> *out,
                       int *prval)
    {
        add_op(CEPH_OSD_OP_LIST_WATCHERS);
        if (prval || out)
        {
            set_handler(CB_ObjectOperation_decodewatchers(out, prval, nullptr));
            out_rval.back() = prval;
        }
    }
    void list_watchers(vector<neorados::ObjWatcher> *out,
                       boost::system::error_code *ec)
    {
        add_op(CEPH_OSD_OP_LIST_WATCHERS);
        set_handler(CB_ObjectOperation_decodewatchersneo(out, nullptr, ec));
        out_ec.back() = ec;
    }

    void list_snaps(librados::snap_set_t *out, int *prval,
                    boost::system::error_code *ec = nullptr)
    {
        add_op(CEPH_OSD_OP_LIST_SNAPS);
        if (prval || out || ec)
        {
            set_handler(CB_ObjectOperation_decodesnaps(out, nullptr, prval, ec));
            out_rval.back() = prval;
            out_ec.back() = ec;
        }
    }

    void list_snaps(neorados::SnapSet *out, int *prval,
                    boost::system::error_code *ec = nullptr)
    {
        add_op(CEPH_OSD_OP_LIST_SNAPS);
        if (prval || out || ec)
        {
            set_handler(CB_ObjectOperation_decodesnaps(nullptr, out, prval, ec));
            out_rval.back() = prval;
            out_ec.back() = ec;
        }
    }

    void assert_version(uint64_t ver)
    {
        OSDOp &osd_op = add_op(CEPH_OSD_OP_ASSERT_VER);
        osd_op.op.assert_ver.ver = ver;
    }

    void cmpxattr(const char *name, const ceph::buffer::list &val,
                  int op, int mode)
    {
        add_xattr(CEPH_OSD_OP_CMPXATTR, name, val);
        OSDOp &o = *ops.rbegin();
        o.op.xattr.cmp_op = op;
        o.op.xattr.cmp_mode = mode;
    }

    void rollback(uint64_t snapid)
    {
        OSDOp &osd_op = add_op(CEPH_OSD_OP_ROLLBACK);
        osd_op.op.snap.snapid = snapid;
    }

    void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc,
                   version_t src_version, unsigned flags,
                   unsigned src_fadvise_flags)
    {
        using ceph::encode;
        OSDOp &osd_op = add_op(CEPH_OSD_OP_COPY_FROM);
        osd_op.op.copy_from.snapid = snapid;
        osd_op.op.copy_from.src_version = src_version;
        osd_op.op.copy_from.flags = flags;
        osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
        encode(src, osd_op.indata);
        encode(src_oloc, osd_op.indata);
    }
    void copy_from2(object_t src, snapid_t snapid, object_locator_t src_oloc,
                    version_t src_version, unsigned flags,
                    uint32_t truncate_seq, uint64_t truncate_size,
                    unsigned src_fadvise_flags)
    {
        using ceph::encode;
        OSDOp &osd_op = add_op(CEPH_OSD_OP_COPY_FROM2);
        osd_op.op.copy_from.snapid = snapid;
        osd_op.op.copy_from.src_version = src_version;
        osd_op.op.copy_from.flags = flags;
        osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
        encode(src, osd_op.indata);
        encode(src_oloc, osd_op.indata);
        encode(truncate_seq, osd_op.indata);
        encode(truncate_size, osd_op.indata);
    }

    /**
     * writeback content to backing tier
     *
     * If object is marked dirty in the cache tier, write back content
     * to backing tier. If the object is clean this is a no-op.
     *
     * If writeback races with an update, the update will block.
     *
     * use with IGNORE_CACHE to avoid triggering promote.
     */
    void cache_flush()
    {
        add_op(CEPH_OSD_OP_CACHE_FLUSH);
    }

    /**
     * writeback content to backing tier
     *
     * If object is marked dirty in the cache tier, write back content
     * to backing tier. If the object is clean this is a no-op.
     *
     * If writeback races with an update, return EAGAIN.  Requires that
     * the SKIPRWLOCKS flag be set.
     *
     * use with IGNORE_CACHE to avoid triggering promote.
     */
    void cache_try_flush()
    {
        add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH);
    }

    /**
     * evict object from cache tier
     *
     * If object is marked clean, remove the object from the cache tier.
     * Otherwise, return EBUSY.
     *
     * use with IGNORE_CACHE to avoid triggering promote.
     */
    void cache_evict()
    {
        add_op(CEPH_OSD_OP_CACHE_EVICT);
    }

    /*
     * Extensible tier
     */
    void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc,
                      version_t tgt_version, int flag)
    {
        using ceph::encode;
        OSDOp &osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT);
        osd_op.op.copy_from.snapid = snapid;
        osd_op.op.copy_from.src_version = tgt_version;
        encode(tgt, osd_op.indata);
        encode(tgt_oloc, osd_op.indata);
        set_last_op_flags(flag);
    }

    void set_chunk(uint64_t src_offset, uint64_t src_length, object_locator_t tgt_oloc,
                   object_t tgt_oid, uint64_t tgt_offset, int flag)
    {
        using ceph::encode;
        OSDOp &osd_op = add_op(CEPH_OSD_OP_SET_CHUNK);
        encode(src_offset, osd_op.indata);
        encode(src_length, osd_op.indata);
        encode(tgt_oloc, osd_op.indata);
        encode(tgt_oid, osd_op.indata);
        encode(tgt_offset, osd_op.indata);
        set_last_op_flags(flag);
    }

    void tier_promote()
    {
        add_op(CEPH_OSD_OP_TIER_PROMOTE);
    }

    void unset_manifest()
    {
        add_op(CEPH_OSD_OP_UNSET_MANIFEST);
    }

    void tier_flush()
    {
        add_op(CEPH_OSD_OP_TIER_FLUSH);
    }

    void tier_evict()
    {
        add_op(CEPH_OSD_OP_TIER_EVICT);
    }

    void set_alloc_hint(uint64_t expected_object_size,
                        uint64_t expected_write_size,
                        uint32_t flags)
    {
        add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT, expected_object_size,
                       expected_write_size, flags);

        // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
        // not worth a feature bit.  Set FAILOK per-op flag to make
        // sure older osds don't trip over an unsupported opcode.
        set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
    }

    template <typename V>
    void dup(V &sops)
    {
        ops.clear();
        std::copy(sops.begin(), sops.end(),
                  std::back_inserter(ops));
        out_bl.resize(sops.size());
        out_handler.resize(sops.size());
        out_rval.resize(sops.size());
        out_ec.resize(sops.size());
        for (uint32_t i = 0; i < sops.size(); i++)
        {
            out_bl[i] = &sops[i].outdata;
            out_rval[i] = &sops[i].rval;
            out_ec[i] = nullptr;
        }
    }

    /**
     * Pin/unpin an object in cache tier
     */
    void cache_pin()
    {
        add_op(CEPH_OSD_OP_CACHE_PIN);
    }

    void cache_unpin()
    {
        add_op(CEPH_OSD_OP_CACHE_UNPIN);
    }
};

inline std::ostream &operator<<(std::ostream &m, const ObjectOperation &oo)
{
    auto i = oo.ops.cbegin();
    m << '[';
    while (i != oo.ops.cend())
    {
        if (i != oo.ops.cbegin())
            m << ' ';
        m << *i;
        ++i;
    }
    m << ']';
    return m;
}

// ----------------

class Objecter : public md_config_obs_t, public Dispatcher
{
    using MOSDOp = _mosdop::MOSDOp<osdc_opvec>;

public:
    using OpSignature = void(boost::system::error_code);
    using OpCompletion = ceph::async::Completion<OpSignature>;

    // config observer bits
    const char **get_tracked_conf_keys() const override;
    void handle_conf_change(const ConfigProxy &conf,
                            const std::set<std::string> &changed) override;

public:
    Messenger *messenger;
    MonClient *monc;
    boost::asio::io_context &service;
    // The guaranteed sequenced, one-at-a-time execution and apparently
    // people sometimes depend on this.
    // 提供序列化的处理程序执行
    boost::asio::io_context::strand finish_strand{service};
    ZTracer::Endpoint trace_endpoint{"0.0.0.0", 0, "Objecter"};

private:
    std::unique_ptr<OSDMap> osdmap{std::make_unique<OSDMap>()};

public:
    using Dispatcher::cct;
    std::multimap<std::string, std::string> crush_location;

    std::atomic<bool> initialized{false};

private:
    std::atomic<uint64_t> last_tid{0};
    std::atomic<unsigned> inflight_ops{0};
    std::atomic<int> client_inc{-1};
    uint64_t max_linger_id{0};
    std::atomic<unsigned> num_in_flight{0};
    std::atomic<int> global_op_flags{0}; // flags which are applied to each IO op
    bool keep_balanced_budget = false;
    bool honor_pool_full = true;

    // If this is true, accumulate a set of blocklisted entities
    // to be drained by consume_blocklist_events.
    bool blocklist_events_enabled = false;
    std::set<entity_addr_t> blocklist_events;
    struct pg_mapping_t
    {
        epoch_t epoch = 0;
        std::vector<int> up;
        int up_primary = -1;
        std::vector<int> acting;
        int acting_primary = -1;

        pg_mapping_t() {}
        pg_mapping_t(epoch_t epoch, std::vector<int> up, int up_primary,
                     std::vector<int> acting, int acting_primary)
            : epoch(epoch), up(up), up_primary(up_primary),
              acting(acting), acting_primary(acting_primary) {}
    };
    ceph::shared_mutex pg_mapping_lock =
        ceph::make_shared_mutex("Objecter::pg_mapping_lock");
    // pool -> pg mapping
    std::map<int64_t, std::vector<pg_mapping_t>> pg_mappings;

    // convenient accessors
    bool lookup_pg_mapping(const pg_t &pg, pg_mapping_t *pg_mapping)
    {
        std::shared_lock l{pg_mapping_lock};
        auto it = pg_mappings.find(pg.pool());
        if (it == pg_mappings.end())
            return false;
        auto &mapping_array = it->second;
        if (pg.ps() >= mapping_array.size())
            return false;
        if (mapping_array[pg.ps()].epoch != pg_mapping->epoch) // stale
            return false;
        *pg_mapping = mapping_array[pg.ps()];
        return true;
    }
    void update_pg_mapping(const pg_t &pg, pg_mapping_t &&pg_mapping)
    {
        std::lock_guard l{pg_mapping_lock};
        auto &mapping_array = pg_mappings[pg.pool()];
        ceph_assert(pg.ps() < mapping_array.size());
        mapping_array[pg.ps()] = std::move(pg_mapping);
    }
    void prune_pg_mapping(const mempool::osdmap::map<int64_t, pg_pool_t> &pools)
    {
        std::lock_guard l{pg_mapping_lock};
        for (auto &pool : pools)
        {
            auto &mapping_array = pg_mappings[pool.first];
            size_t pg_num = pool.second.get_pg_num();
            if (mapping_array.size() != pg_num)
            {
                // catch both pg_num increasing & decreasing
                mapping_array.resize(pg_num);
            }
        }
        for (auto it = pg_mappings.begin(); it != pg_mappings.end();)
        {
            if (!pools.count(it->first))
            {
                // pool is gone
                pg_mappings.erase(it++);
                continue;
            }
            it++;
        }
    }

public:
    void maybe_request_map();

    void enable_blocklist_events();

private:
    void _maybe_request_map();

    version_t last_seen_osdmap_version = 0;
    version_t last_seen_pgmap_version = 0;

    mutable ceph::shared_mutex rwlock =
        ceph::make_shared_mutex("Objecter::rwlock");
    ceph::timer<ceph::coarse_mono_clock> timer;

    PerfCounters *logger = nullptr;

    uint64_t tick_event = 0;

    void start_tick();
    void tick();
    void update_crush_location();

    class RequestStateHook;

    RequestStateHook *m_request_state_hook = nullptr;

public:
    /*** track pending operations ***/
    // read

    struct OSDSession;

    struct op_target_t
    {
        int flags = 0;

        epoch_t epoch = 0; ///< latest epoch we calculated the mapping

        object_t base_oid;            // 读取的对象
        object_locator_t base_oloc;   // 对象的pool信息
        object_t target_oid;          // 最终读取的目标对象
        object_locator_t target_oloc; // 最终该目标对象的pool信息

        ///< true if we are directed at base_pgid, not base_oid
        bool precalc_pgid = false;

        ///< true if we have ever mapped to a valid pool
        bool pool_ever_existed = false;

        ///< explcit pg target, if any
        pg_t base_pgid;

        pg_t pgid;                     ///< last (raw) pg we mapped to
        spg_t actual_pgid;             ///< last (actual) spg_t we mapped to
        unsigned pg_num = 0;           ///< last pg_num we mapped to
        unsigned pg_num_mask = 0;      ///< last pg_num_mask we mapped to
        unsigned pg_num_pending = 0;   ///< last pg_num we mapped to
        std::vector<int> up;           ///< set of up osds for last pg we mapped to
        std::vector<int> acting;       ///< set of acting osds for last pg we mapped to
        int up_primary = -1;           ///< last up_primary we mapped to
        int acting_primary = -1;       ///< last acting_primary we mapped to
        int size = -1;                 ///< the size of the pool when were were last mapped
        int min_size = -1;             ///< the min size of the pool when were were last mapped
        bool sort_bitwise = false;     ///< whether the hobject_t sort order is bitwise
        bool recovery_deletes = false; ///< whether the deletes are performed during recovery instead of peering
        uint32_t peering_crush_bucket_count = 0;
        uint32_t peering_crush_bucket_target = 0;
        uint32_t peering_crush_bucket_barrier = 0;
        int32_t peering_crush_mandatory_member = CRUSH_ITEM_NONE;

        bool used_replica = false;
        bool paused = false;

        int osd = -1; ///< the final target osd, or -1

        epoch_t last_force_resend = 0;

        op_target_t(object_t oid, object_locator_t oloc, int flags)
            : flags(flags),
              base_oid(oid),
              base_oloc(oloc) {}

        explicit op_target_t(pg_t pgid)
            : base_oloc(pgid.pool(), pgid.ps()),
              precalc_pgid(true),
              base_pgid(pgid) {}

        op_target_t() = default;

        hobject_t get_hobj()
        {
            return hobject_t(target_oid,
                             target_oloc.key,
                             CEPH_NOSNAP,
                             target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(),
                             target_oloc.pool,
                             target_oloc.nspace);
        }

        bool contained_by(const hobject_t &begin, const hobject_t &end)
        {
            hobject_t h = get_hobj();
            int r = cmp(h, begin);
            return r == 0 || (r > 0 && h < end);
        }

        bool respects_full() const
        {
            return (flags & (CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_RWORDERED)) &&
                   !(flags & (CEPH_OSD_FLAG_FULL_TRY | CEPH_OSD_FLAG_FULL_FORCE));
        }

        void dump(ceph::Formatter *f) const;
    };

    std::unique_ptr<ceph::async::Completion<void(boost::system::error_code)>>
    OpContextVert(Context *c)
    {
        if (c)
            return ceph::async::Completion<void(boost::system::error_code)>::create(
                service.get_executor(),
                [c = std::unique_ptr<Context>(c)](boost::system::error_code e) mutable {
                    c.release()->complete(e);
                });
        else
            return nullptr;
    }

    template <typename T>
    std::unique_ptr<ceph::async::Completion<void(boost::system::error_code, T)>>
    OpContextVert(Context *c, T *p)
    {

        if (c || p)
            return ceph::async::Completion<void(boost::system::error_code, T)>::create(
                service.get_executor(),
                [c = std::unique_ptr<Context>(c), p](boost::system::error_code e, T r) mutable {
                    if (p)
                        *p = std::move(r);
                    if (c)
                        c.release()->complete(ceph::from_error_code(e));
                });
        else
            return nullptr;
    }

    template <typename T>
    std::unique_ptr<ceph::async::Completion<void(boost::system::error_code, T)>>
    OpContextVert(Context *c, T &p)
    {
        if (c)
            return ceph::async::Completion<
                void(boost::system::error_code, T)>::create(service.get_executor(),
                                                            [c = std::unique_ptr<Context>(c), &p](boost::system::error_code e, T r) mutable {
                                                                p = std::move(r);
                                                                if (c)
                                                                    c.release()->complete(ceph::from_error_code(e));
                                                            });
        else
            return nullptr;
    }

    struct Op : public RefCountedObject
    {
        OSDSession *session = nullptr;
        int incarnation = 0;

        op_target_t target;

        ConnectionRef con = nullptr;                         // for rx buffer only
        uint64_t features = CEPH_FEATURES_SUPPORTED_DEFAULT; // explicitly specified op features

        osdc_opvec ops; // op vector

        snapid_t snapid = CEPH_NOSNAP;
        SnapContext snapc;
        ceph::real_time mtime;

        ceph::buffer::list *outbl = nullptr;
        boost::container::small_vector<ceph::buffer::list *, osdc_opvec_len> out_bl;
        boost::container::small_vector<
            fu2::unique_function<void(boost::system::error_code, int,
                                      const ceph::buffer::list &bl) &&>,
            osdc_opvec_len>
            out_handler;
        boost::container::small_vector<int *, osdc_opvec_len> out_rval;
        boost::container::small_vector<boost::system::error_code *,
                                       osdc_opvec_len>
            out_ec;

        int priority = 0;
        using OpSig = void(boost::system::error_code);
        using OpComp = ceph::async::Completion<OpSig>;
        // Due to an irregularity of cmpxattr, we actualy need the 'int'
        // value for onfinish for legacy librados users. As such just
        // preserve the Context* in this one case. That way we can have
        // our callers just pass in a unique_ptr<OpComp> and not deal with
        // our signature in Objecter being different than the exposed
        // signature in RADOS.
        //
        // Add a function for the linger case, where we want better
        // semantics than Context, but still need to be under the completion_lock.
        std::variant<std::unique_ptr<OpComp>, fu2::unique_function<OpSig>, Context *> onfinish;
        uint64_t ontimeout = 0;

        ceph_tid_t tid = 0;
        int attempts = 0;

        version_t *objver;
        epoch_t *reply_epoch = nullptr;

        ceph::coarse_mono_time stamp;

        epoch_t map_dne_bound = 0;

        int budget = -1;

        /// true if we should resend this message on failure
        bool should_resend = true;

        /// true if the throttle budget is get/put on a series of OPs,
        /// instead of per OP basis, when this flag is set, the budget is
        /// acquired before sending the very first OP of the series and
        /// released upon receiving the last OP reply.
        bool ctx_budgeted = false;

        int *data_offset;

        osd_reqid_t reqid; // explicitly setting reqid
        ZTracer::Trace trace;

        static bool has_completion(decltype(onfinish) &f)
        {
            return std::visit([](auto &&arg) { return bool(arg); }, f);
        }
        bool has_completion()
        {
            return has_completion(onfinish);
        }

        static void complete(decltype(onfinish) &&f, boost::system::error_code ec,
                             int r)
        {
            std::visit([ec, r](auto &&arg) {
                if constexpr (std::is_same_v<std::decay_t<decltype(arg)>,
                                             Context *>)
                {
                    arg->complete(r);
                }
                else if constexpr (std::is_same_v<std::decay_t<decltype(arg)>,
                                                  fu2::unique_function<OpSig>>)
                {
                    std::move(arg)(ec);
                }
                else
                {
                    arg->defer(std::move(arg), ec);
                }
            },
                       std::move(f));
        }
        void complete(boost::system::error_code ec, int r)
        {
            complete(std::move(onfinish), ec, r);
        }

        Op(const object_t &o, const object_locator_t &ol, osdc_opvec &&_ops,
           int f, std::unique_ptr<OpComp> &&fin,
           version_t *ov, int *offset = nullptr,
           ZTracer::Trace *parent_trace = nullptr) : target(o, ol, f),
                                                     ops(std::move(_ops)),
                                                     out_bl(ops.size(), nullptr),
                                                     out_handler(ops.size()),
                                                     out_rval(ops.size(), nullptr),
                                                     out_ec(ops.size(), nullptr),
                                                     onfinish(std::move(fin)),
                                                     objver(ov),
                                                     data_offset(offset)
        {
            if (target.base_oloc.key == o)
                target.base_oloc.key.clear();
            if (parent_trace && parent_trace->valid())
            {
                trace.init("op", nullptr, parent_trace);
                trace.event("start");
            }
        }

        Op(const object_t &o, const object_locator_t &ol, osdc_opvec &&_ops,
           int f, Context *fin, version_t *ov, int *offset = nullptr,
           ZTracer::Trace *parent_trace = nullptr) : target(o, ol, f),
                                                     ops(std::move(_ops)),
                                                     out_bl(ops.size(), nullptr),
                                                     out_handler(ops.size()),
                                                     out_rval(ops.size(), nullptr),
                                                     out_ec(ops.size(), nullptr),
                                                     onfinish(fin),
                                                     objver(ov),
                                                     data_offset(offset)
        {
            if (target.base_oloc.key == o)
                target.base_oloc.key.clear();
            if (parent_trace && parent_trace->valid())
            {
                trace.init("op", nullptr, parent_trace);
                trace.event("start");
            }
        }

        Op(const object_t &o, const object_locator_t &ol, osdc_opvec &&_ops,
           int f, fu2::unique_function<OpSig> &&fin, version_t *ov, int *offset = nullptr,
           ZTracer::Trace *parent_trace = nullptr) : target(o, ol, f),
                                                     ops(std::move(_ops)),
                                                     out_bl(ops.size(), nullptr),
                                                     out_handler(ops.size()),
                                                     out_rval(ops.size(), nullptr),
                                                     out_ec(ops.size(), nullptr),
                                                     onfinish(std::move(fin)),
                                                     objver(ov),
                                                     data_offset(offset)
        {
            if (target.base_oloc.key == o)
                target.base_oloc.key.clear();
            if (parent_trace && parent_trace->valid())
            {
                trace.init("op", nullptr, parent_trace);
                trace.event("start");
            }
        }

        bool operator<(const Op &other) const
        {
            return tid < other.tid;
        }

    private:
        ~Op() override
        {
            trace.event("finish");
        }
    };

    struct CB_Op_Map_Latest
    {
        Objecter *objecter;
        ceph_tid_t tid;
        CB_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t) {}
        void operator()(boost::system::error_code err, version_t latest, version_t);
    };

    struct CB_Command_Map_Latest
    {
        Objecter *objecter;
        uint64_t tid;
        CB_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t) {}
        void operator()(boost::system::error_code err, version_t latest, version_t);
    };

    struct C_Stat : public Context
    {
        ceph::buffer::list bl;
        uint64_t *psize;
        ceph::real_time *pmtime;
        Context *fin;
        C_Stat(uint64_t *ps, ceph::real_time *pm, Context *c) : psize(ps), pmtime(pm), fin(c) {}
        void finish(int r) override
        {
            using ceph::decode;
            if (r >= 0)
            {
                auto p = bl.cbegin();
                uint64_t s;
                ceph::real_time m;
                decode(s, p);
                decode(m, p);
                if (psize)
                    *psize = s;
                if (pmtime)
                    *pmtime = m;
            }
            fin->complete(r);
        }
    };

    struct C_GetAttrs : public Context
    {
        ceph::buffer::list bl;
        std::map<std::string, ceph::buffer::list> &attrset;
        Context *fin;
        C_GetAttrs(std::map<std::string, ceph::buffer::list> &set, Context *c) : attrset(set),
                                                                                 fin(c) {}
        void finish(int r) override
        {
            using ceph::decode;
            if (r >= 0)
            {
                auto p = bl.cbegin();
                decode(attrset, p);
            }
            fin->complete(r);
        }
    };

    // Pools and statistics
    struct NListContext
    {
        collection_list_handle_t pos;

        // these are for !sortbitwise compat only
        int current_pg = 0;
        int starting_pg_num = 0;
        bool sort_bitwise = false;

        bool at_end_of_pool = false; ///< publicly visible end flag

        int64_t pool_id = -1;
        int pool_snap_seq = 0;
        uint64_t max_entries = 0;
        std::string nspace;

        ceph::buffer::list bl; // raw data read to here
        std::list<librados::ListObjectImpl> list;

        ceph::buffer::list filter;

        // The budget associated with this context, once it is set (>= 0),
        // the budget is not get/released on OP basis, instead the budget
        // is acquired before sending the first OP and released upon receiving
        // the last op reply.
        int ctx_budget = -1;

        bool at_end() const
        {
            return at_end_of_pool;
        }

        uint32_t get_pg_hash_position() const
        {
            return pos.get_hash();
        }
    };

    struct C_NList : public Context
    {
        NListContext *list_context;
        Context *final_finish;
        Objecter *objecter;
        epoch_t epoch;
        C_NList(NListContext *lc, Context *finish, Objecter *ob) : list_context(lc), final_finish(finish), objecter(ob), epoch(0) {}
        void finish(int r) override
        {
            if (r >= 0)
            {
                objecter->_nlist_reply(list_context, r, final_finish, epoch);
            }
            else
            {
                final_finish->complete(r);
            }
        }
    };

    struct PoolStatOp
    {
        ceph_tid_t tid;
        std::vector<std::string> pools;
        using OpSig = void(boost::system::error_code,
                           boost::container::flat_map<std::string, pool_stat_t>,
                           bool);
        using OpComp = ceph::async::Completion<OpSig>;
        std::unique_ptr<OpComp> onfinish;
        std::uint64_t ontimeout;
        ceph::coarse_mono_time last_submit;
    };

    struct StatfsOp
    {
        ceph_tid_t tid;
        boost::optional<int64_t> data_pool;
        using OpSig = void(boost::system::error_code,
                           const struct ceph_statfs);
        using OpComp = ceph::async::Completion<OpSig>;

        std::unique_ptr<OpComp> onfinish;
        uint64_t ontimeout;

        ceph::coarse_mono_time last_submit;
    };

    struct PoolOp
    {
        ceph_tid_t tid = 0;
        int64_t pool = 0;
        std::string name;
        using OpSig = void(boost::system::error_code, ceph::buffer::list);
        using OpComp = ceph::async::Completion<OpSig>;
        std::unique_ptr<OpComp> onfinish;
        uint64_t ontimeout = 0;
        int pool_op = 0;
        int16_t crush_rule = 0;
        snapid_t snapid = 0;
        ceph::coarse_mono_time last_submit;

        PoolOp() {}
    };

    // -- osd commands --
    struct CommandOp : public RefCountedObject
    {
        OSDSession *session = nullptr;
        ceph_tid_t tid = 0;
        std::vector<std::string> cmd;
        ceph::buffer::list inbl;

        // target_osd == -1 means target_pg is valid
        const int target_osd = -1;
        const pg_t target_pg;

        op_target_t target;

        epoch_t map_dne_bound = 0;
        int map_check_error = 0; // error to return if std::map check fails
        const char *map_check_error_str = nullptr;

        using OpSig = void(boost::system::error_code, std::string,
                           ceph::buffer::list);
        using OpComp = ceph::async::Completion<OpSig>;
        std::unique_ptr<OpComp> onfinish;

        uint64_t ontimeout = 0;
        ceph::coarse_mono_time last_submit;

        CommandOp(
            int target_osd,
            std::vector<string> &&cmd,
            ceph::buffer::list &&inbl,
            decltype(onfinish) &&onfinish)
            : cmd(std::move(cmd)),
              inbl(std::move(inbl)),
              target_osd(target_osd),
              onfinish(std::move(onfinish)) {}

        CommandOp(
            pg_t pgid,
            std::vector<string> &&cmd,
            ceph::buffer::list &&inbl,
            decltype(onfinish) &&onfinish)
            : cmd(std::move(cmd)),
              inbl(std::move(inbl)),
              target_pg(pgid),
              target(pgid),
              onfinish(std::move(onfinish)) {}
    };

    void submit_command(CommandOp *c, ceph_tid_t *ptid);
    int _calc_command_target(CommandOp *c,
                             ceph::shunique_lock<ceph::shared_mutex> &sul);
    void _assign_command_session(CommandOp *c,
                                 ceph::shunique_lock<ceph::shared_mutex> &sul);
    void _send_command(CommandOp *c);
    int command_op_cancel(OSDSession *s, ceph_tid_t tid,
                          boost::system::error_code ec);
    void _finish_command(CommandOp *c, boost::system::error_code ec,
                         std::string &&rs, ceph::buffer::list &&bl);
    void handle_command_reply(MCommandReply *m);

    // -- lingering ops --

    struct LingerOp : public RefCountedObject
    {
        Objecter *objecter;
        uint64_t linger_id{0};
        op_target_t target{object_t(), object_locator_t(), 0};
        snapid_t snap{CEPH_NOSNAP};
        SnapContext snapc;
        ceph::real_time mtime;

        osdc_opvec ops;
        ceph::buffer::list inbl;
        version_t *pobjver{nullptr};

        bool is_watch{false};
        ceph::coarse_mono_time watch_valid_thru; ///< send time for last acked ping
        boost::system::error_code last_error;    ///< error from last failed ping|reconnect, if any
        ceph::shared_mutex watch_lock;

        // queue of pending async operations, with the timestamp of
        // when they were queued.
        std::list<ceph::coarse_mono_time> watch_pending_async;

        uint32_t register_gen{0};
        bool registered{false};
        bool canceled{false};
        using OpSig = void(boost::system::error_code, ceph::buffer::list);
        using OpComp = ceph::async::Completion<OpSig>;
        std::unique_ptr<OpComp> on_reg_commit;
        std::unique_ptr<OpComp> on_notify_finish;
        uint64_t notify_id{0};

        fu2::unique_function<void(boost::system::error_code,
                                  uint64_t notify_id,
                                  uint64_t cookie,
                                  uint64_t notifier_id,
                                  ceph::buffer::list &&bl)>
            handle;
        OSDSession *session{nullptr};

        int ctx_budget{-1};
        ceph_tid_t register_tid{0};
        ceph_tid_t ping_tid{0};
        epoch_t map_dne_bound{0};

        void _queued_async()
        {
            // watch_lock ust be locked unique
            watch_pending_async.push_back(ceph::coarse_mono_clock::now());
        }
        void finished_async()
        {
            unique_lock l(watch_lock);
            ceph_assert(!watch_pending_async.empty());
            watch_pending_async.pop_front();
        }

        explicit LingerOp(Objecter *o, uint64_t linger_id)
            : objecter(o), linger_id(linger_id),
              watch_lock(ceph::make_shared_mutex(
                  fmt::format("LingerOp::watch_lock #{}", linger_id))) {}

        const LingerOp &operator=(const LingerOp &r) = delete;
        LingerOp(const LingerOp &o) = delete;

        uint64_t get_cookie()
        {
            return reinterpret_cast<uint64_t>(this);
        }
    };

    struct CB_Linger_Commit
    {
        Objecter *objecter;
        boost::intrusive_ptr<LingerOp> info;
        ceph::buffer::list outbl; // used for notify only
        CB_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) {}
        ~CB_Linger_Commit() = default;

        void operator()(boost::system::error_code ec)
        {
            objecter->_linger_commit(info.get(), ec, outbl);
        }
    };

    struct CB_Linger_Reconnect
    {
        Objecter *objecter;
        boost::intrusive_ptr<LingerOp> info;
        CB_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) {}
        ~CB_Linger_Reconnect() = default;

        void operator()(boost::system::error_code ec)
        {
            objecter->_linger_reconnect(info.get(), ec);
            info.reset();
        }
    };

    struct CB_Linger_Ping
    {
        Objecter *objecter;
        boost::intrusive_ptr<LingerOp> info;
        ceph::coarse_mono_time sent;
        uint32_t register_gen;
        CB_Linger_Ping(Objecter *o, LingerOp *l, ceph::coarse_mono_time s)
            : objecter(o), info(l), sent(s), register_gen(info->register_gen) {}
        void operator()(boost::system::error_code ec)
        {
            objecter->_linger_ping(info.get(), ec, sent, register_gen);
            info.reset();
        }
    };

    struct CB_Linger_Map_Latest
    {
        Objecter *objecter;
        uint64_t linger_id;
        CB_Linger_Map_Latest(Objecter *o, uint64_t id) : objecter(o), linger_id(id) {}
        void operator()(boost::system::error_code err, version_t latest, version_t);
    };

    // -- osd sessions --
    struct OSDBackoff
    {
        spg_t pgid;
        uint64_t id;
        hobject_t begin, end;
    };

    struct OSDSession : public RefCountedObject
    {
        // pending ops
        std::map<ceph_tid_t, Op *> ops;
        std::map<uint64_t, LingerOp *> linger_ops;
        std::map<ceph_tid_t, CommandOp *> command_ops;

        // backoffs
        std::map<spg_t, std::map<hobject_t, OSDBackoff>> backoffs;
        std::map<uint64_t, OSDBackoff *> backoffs_by_id;

        int osd;
        // NB locking two sessions at the same time is only safe because
        // it is only done in _recalc_linger_op_target with s and
        // linger_op->session, and it holds rwlock for write.  We disable
        // lockdep (using std::sharedMutex) because lockdep doesn't know
        // that.
        std::shared_mutex lock;

        int incarnation;
        ConnectionRef con;
        int num_locks;
        std::unique_ptr<std::mutex[]> completion_locks;

        OSDSession(CephContext *cct, int o) : osd(o), incarnation(0), con(NULL),
                                              num_locks(cct->_conf->objecter_completion_locks_per_session),
                                              completion_locks(new std::mutex[num_locks]) {}

        ~OSDSession() override;

        bool is_homeless() { return (osd == -1); }

        std::unique_lock<std::mutex> get_lock(object_t &oid);
    };
    std::map<int, OSDSession *> osd_sessions;

    bool osdmap_full_flag() const;
    bool osdmap_pool_full(const int64_t pool_id) const;

private:
    /**
     * Test pg_pool_t::FLAG_FULL on a pool
     *
     * @return true if the pool exists and has the flag set, or
     *         the global full flag is set, else false
     */
    bool _osdmap_pool_full(const int64_t pool_id) const;
    bool _osdmap_pool_full(const pg_pool_t &p) const
    {
        return p.has_flag(pg_pool_t::FLAG_FULL) && honor_pool_full;
    }
    void update_pool_full_map(std::map<int64_t, bool> &pool_full_map);

    std::map<uint64_t, LingerOp *> linger_ops;
    // we use this just to confirm a cookie is valid before dereferencing the ptr
    std::set<LingerOp *> linger_ops_set;

    std::map<ceph_tid_t, PoolStatOp *> poolstat_ops;
    std::map<ceph_tid_t, StatfsOp *> statfs_ops;
    std::map<ceph_tid_t, PoolOp *> pool_ops;
    std::atomic<unsigned> num_homeless_ops{0};

    OSDSession *homeless_session = new OSDSession(cct, -1);

    // ops waiting for an osdmap with a new pool or confirmation that
    // the pool does not exist (may be expanded to other uses later)
    std::map<uint64_t, LingerOp *> check_latest_map_lingers;
    std::map<ceph_tid_t, Op *> check_latest_map_ops;
    std::map<ceph_tid_t, CommandOp *> check_latest_map_commands;

    std::map<epoch_t,
             std::vector<std::pair<std::unique_ptr<OpCompletion>,
                                   boost::system::error_code>>>
        waiting_for_map;

    ceph::timespan mon_timeout;
    ceph::timespan osd_timeout;

    MOSDOp *_prepare_osd_op(Op *op);
    void _send_op(Op *op);
    void _send_op_account(Op *op);
    void _cancel_linger_op(Op *op);
    void _finish_op(Op *op, int r);
    static bool is_pg_changed(
        int oldprimary,
        const std::vector<int> &oldacting,
        int newprimary,
        const std::vector<int> &newacting,
        bool any_change = false);
    enum recalc_op_target_result
    {
        RECALC_OP_TARGET_NO_ACTION = 0,
        RECALC_OP_TARGET_NEED_RESEND,
        RECALC_OP_TARGET_POOL_DNE,
        RECALC_OP_TARGET_OSD_DNE,
        RECALC_OP_TARGET_OSD_DOWN,
    };
    bool _osdmap_full_flag() const;
    bool _osdmap_has_pool_full() const;
    void _prune_snapc(
        const mempool::osdmap::map<int64_t, snap_interval_set_t> &new_removed_snaps,
        Op *op);

    bool target_should_be_paused(op_target_t *op);
    int _calc_target(op_target_t *t, Connection *con,
                     bool any_change = false);
    int _map_session(op_target_t *op, OSDSession **s,
                     ceph::shunique_lock<ceph::shared_mutex> &lc);

    void _session_op_assign(OSDSession *s, Op *op);
    void _session_op_remove(OSDSession *s, Op *op);
    void _session_linger_op_assign(OSDSession *to, LingerOp *op);
    void _session_linger_op_remove(OSDSession *from, LingerOp *op);
    void _session_command_op_assign(OSDSession *to, CommandOp *op);
    void _session_command_op_remove(OSDSession *from, CommandOp *op);

    int _assign_op_target_session(Op *op, ceph::shunique_lock<ceph::shared_mutex> &lc,
                                  bool src_session_locked,
                                  bool dst_session_locked);
    int _recalc_linger_op_target(LingerOp *op,
                                 ceph::shunique_lock<ceph::shared_mutex> &lc);

    void _linger_submit(LingerOp *info,
                        ceph::shunique_lock<ceph::shared_mutex> &sul);
    void _send_linger(LingerOp *info,
                      ceph::shunique_lock<ceph::shared_mutex> &sul);
    void _linger_commit(LingerOp *info, boost::system::error_code ec,
                        ceph::buffer::list &outbl);
    void _linger_reconnect(LingerOp *info, boost::system::error_code ec);
    void _send_linger_ping(LingerOp *info);
    void _linger_ping(LingerOp *info, boost::system::error_code ec,
                      ceph::coarse_mono_time sent, uint32_t register_gen);
    boost::system::error_code _normalize_watch_error(boost::system::error_code ec);

    friend class CB_Objecter_GetVersion;
    friend class CB_DoWatchError;

public:
    template <typename CT>
    auto linger_callback_flush(CT &&ct)
    {
        // 帮助程序模板，用于从CompletionToken推断处理程序类型，捕获处理程序的本地副本，
        // 然后为处理程序创建async_result。
        boost::asio::async_completion<CT, void(void)> init(ct);
        boost::asio::defer(finish_strand, std::move(init.completion_handler));
        return init.result.get();
    }

private:
    void _check_op_pool_dne(Op *op, std::unique_lock<std::shared_mutex> *sl);
    void _send_op_map_check(Op *op);
    void _op_cancel_map_check(Op *op);
    void _check_linger_pool_dne(LingerOp *op, bool *need_unregister);
    void _send_linger_map_check(LingerOp *op);
    void _linger_cancel_map_check(LingerOp *op);
    void _check_command_map_dne(CommandOp *op);
    void _send_command_map_check(CommandOp *op);
    void _command_cancel_map_check(CommandOp *op);

    void _kick_requests(OSDSession *session, std::map<uint64_t, LingerOp *> &lresend);
    void _linger_ops_resend(std::map<uint64_t, LingerOp *> &lresend,
                            std::unique_lock<ceph::shared_mutex> &ul);

    int _get_session(int osd, OSDSession **session,
                     ceph::shunique_lock<ceph::shared_mutex> &sul);
    void put_session(OSDSession *s);
    void get_session(OSDSession *s);
    void _reopen_session(OSDSession *session);
    void close_session(OSDSession *session);

    void _nlist_reply(NListContext *list_context, int r, Context *final_finish,
                      epoch_t reply_epoch);

    void resend_mon_ops();

    /**
     * handle a budget for in-flight ops
     * budget is taken whenever an op goes into the ops std::map
     * and returned whenever an op is removed from the std::map
     * If throttle_op needs to throttle it will unlock client_lock.
     */
    int calc_op_budget(const boost::container::small_vector_base<OSDOp> &ops);
    void _throttle_op(Op *op, ceph::shunique_lock<ceph::shared_mutex> &sul,
                      int op_size = 0);
    int _take_op_budget(Op *op, ceph::shunique_lock<ceph::shared_mutex> &sul)
    {
        ceph_assert(sul && sul.mutex() == &rwlock);
        int op_budget = calc_op_budget(op->ops);
        if (keep_balanced_budget)
        {
            _throttle_op(op, sul, op_budget);
        }
        else
        { // update take_linger_budget to match this!
            op_throttle_bytes.take(op_budget);
            op_throttle_ops.take(1);
        }
        op->budget = op_budget;
        return op_budget;
    }
    int take_linger_budget(LingerOp *info);
    void put_op_budget_bytes(int op_budget)
    {
        ceph_assert(op_budget >= 0);
        op_throttle_bytes.put(op_budget);
        op_throttle_ops.put(1);
    }
    void put_nlist_context_budget(NListContext *list_context);
    Throttle op_throttle_bytes{cct, "objecter_bytes",
                               static_cast<int64_t>(
                                   cct->_conf->objecter_inflight_op_bytes)};
    Throttle op_throttle_ops{cct, "objecter_ops",
                             static_cast<int64_t>(
                                 cct->_conf->objecter_inflight_ops)};

public:
    Objecter(CephContext *cct, Messenger *m, MonClient *mc,
             boost::asio::io_context &service);
    ~Objecter() override;

    void init();
    void start(const OSDMap *o = nullptr);
    void shutdown();

    // These two templates replace osdmap_(get)|(put)_read. Simply wrap
    // whatever functionality you want to use the OSDMap in a lambda like:
    //
    // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
    //
    // or
    //
    // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
    //
    // Do not call into something that will try to lock the OSDMap from
    // here or you will have great woe and misery.

    template <typename Callback, typename... Args>
    decltype(auto) with_osdmap(Callback &&cb, Args &&...args)
    {
        shared_lock l(rwlock);
        return std::forward<Callback>(cb)(*osdmap, std::forward<Args>(args)...);
    }

    /**
     * Tell the objecter to throttle outgoing ops according to its
     * budget (in _conf). If you do this, ops can block, in
     * which case it will unlock client_lock and sleep until
     * incoming messages reduce the used budget low enough for
     * the ops to continue going; then it will lock client_lock again.
     */
    void set_balanced_budget() { keep_balanced_budget = true; }
    void unset_balanced_budget() { keep_balanced_budget = false; }

    void set_honor_pool_full() { honor_pool_full = true; }
    void unset_honor_pool_full() { honor_pool_full = false; }

    void _scan_requests(
        OSDSession *s,
        bool skipped_map,
        bool cluster_full,
        std::map<int64_t, bool> *pool_full_map,
        std::map<ceph_tid_t, Op *> &need_resend,
        std::list<LingerOp *> &need_resend_linger,
        std::map<ceph_tid_t, CommandOp *> &need_resend_command,
        ceph::shunique_lock<ceph::shared_mutex> &sul);

    int64_t get_object_hash_position(int64_t pool, const std::string &key,
                                     const std::string &ns);
    int64_t get_object_pg_hash_position(int64_t pool, const std::string &key,
                                        const std::string &ns);

    // messages
public:
    bool ms_dispatch(Message *m) override;
    bool ms_can_fast_dispatch_any() const override
    {
        return true;
    }
    bool ms_can_fast_dispatch(const Message *m) const override
    {
        switch (m->get_type())
        {
        case CEPH_MSG_OSD_OPREPLY:
        case CEPH_MSG_WATCH_NOTIFY:
            return true;
        default:
            return false;
        }
    }
    void ms_fast_dispatch(Message *m) override
    {
        if (!ms_dispatch(m))
        {
            m->put();
        }
    }

    void handle_osd_op_reply(class MOSDOpReply *m);
    void handle_osd_backoff(class MOSDBackoff *m);
    void handle_watch_notify(class MWatchNotify *m);
    void handle_osd_map(class MOSDMap *m);
    void wait_for_osd_map(epoch_t e = 0);

    template <typename CompletionToken>
    auto wait_for_osd_map(CompletionToken &&token)
    {
        boost::asio::async_completion<CompletionToken, void()> init(token);
        unique_lock l(rwlock);
        if (osdmap->get_epoch())
        {
            l.unlock();
            boost::asio::post(std::move(init.completion_handler));
        }
        else
        {
            waiting_for_map[0].emplace_back(
                OpCompletion::create(
                    service.get_executor(),
                    [c = std::move(init.completion_handler)](boost::system::error_code) mutable {
                        std::move(c)();
                    }),
                boost::system::error_code{});
            l.unlock();
        }
        return init.result.get();
    }

    /**
     * Get std::list of entities blocklisted since this was last called,
     * and reset the std::list.
     *
     * Uses a std::set because typical use case is to compare some
     * other std::list of clients to see which overlap with the blocklisted
     * addrs.
     *
     */
    void consume_blocklist_events(std::set<entity_addr_t> *events);

    int pool_snap_by_name(int64_t poolid,
                          const char *snap_name,
                          snapid_t *snap) const;
    int pool_snap_get_info(int64_t poolid, snapid_t snap,
                           pool_snap_info_t *info) const;
    int pool_snap_list(int64_t poolid, std::vector<uint64_t> *snaps);

private:
    void emit_blocklist_events(const OSDMap::Incremental &inc);
    void emit_blocklist_events(const OSDMap &old_osd_map,
                               const OSDMap &new_osd_map);

    // low-level
    void _op_submit(Op *op, ceph::shunique_lock<ceph::shared_mutex> &lc,
                    ceph_tid_t *ptid);
    void _op_submit_with_budget(Op *op,
                                ceph::shunique_lock<ceph::shared_mutex> &lc,
                                ceph_tid_t *ptid,
                                int *ctx_budget = NULL);
    // public interface
public:
    void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL);
    bool is_active()
    {
        std::shared_lock l(rwlock);
        return !((!inflight_ops) && linger_ops.empty() &&
                 poolstat_ops.empty() && statfs_ops.empty());
    }

    /**
     * Output in-flight requests
     */
    void _dump_active(OSDSession *s);
    void _dump_active();
    void dump_active();
    void dump_requests(ceph::Formatter *fmt);
    void _dump_ops(const OSDSession *s, ceph::Formatter *fmt);
    void dump_ops(ceph::Formatter *fmt);
    void _dump_linger_ops(const OSDSession *s, ceph::Formatter *fmt);
    void dump_linger_ops(ceph::Formatter *fmt);
    void _dump_command_ops(const OSDSession *s, ceph::Formatter *fmt);
    void dump_command_ops(ceph::Formatter *fmt);
    void dump_pool_ops(ceph::Formatter *fmt) const;
    void dump_pool_stat_ops(ceph::Formatter *fmt) const;
    void dump_statfs_ops(ceph::Formatter *fmt) const;

    int get_client_incarnation() const { return client_inc; }
    void set_client_incarnation(int inc) { client_inc = inc; }

    bool have_map(epoch_t epoch);

    struct CB_Objecter_GetVersion
    {
        Objecter *objecter;
        std::unique_ptr<OpCompletion> fin;

        CB_Objecter_GetVersion(Objecter *o, std::unique_ptr<OpCompletion> c)
            : objecter(o), fin(std::move(c)) {}
        void operator()(boost::system::error_code ec, version_t newest,
                        version_t oldest)
        {
            if (ec == boost::system::errc::resource_unavailable_try_again)
            {
                // try again as instructed
                objecter->_wait_for_latest_osdmap(std::move(*this));
            }
            else if (ec)
            {
                ceph::async::post(std::move(fin), ec);
            }
            else
            {
                auto l = std::unique_lock(objecter->rwlock);
                objecter->_get_latest_version(oldest, newest, std::move(fin),
                                              std::move(l));
            }
        }
    };

    template <typename CompletionToken>
    auto wait_for_map(epoch_t epoch, CompletionToken &&token)
    {
        boost::asio::async_completion<CompletionToken, OpSignature> init(token);

        if (osdmap->get_epoch() >= epoch)
        {
            boost::asio::post(service,
                              ceph::async::bind_handler(
                                  std::move(init.completion_handler),
                                  boost::system::error_code()));
        }
        else
        {
            monc->get_version("osdmap",
                              CB_Objecter_GetVersion(
                                  this,
                                  OpCompletion::create(service.get_executor(),
                                                       std::move(init.completion_handler))));
        }
        return init.result.get();
    }

    void _wait_for_new_map(std::unique_ptr<OpCompletion>, epoch_t epoch,
                           boost::system::error_code = {});

private:
    void _wait_for_latest_osdmap(CB_Objecter_GetVersion &&c)
    {
        monc->get_version("osdmap", std::move(c));
    }

public:
    template <typename CompletionToken>
    auto wait_for_latest_osdmap(CompletionToken &&token)
    {
        boost::asio::async_completion<CompletionToken, OpSignature> init(token);

        monc->get_version("osdmap",
                          CB_Objecter_GetVersion(
                              this,
                              OpCompletion::create(service.get_executor(),
                                                   std::move(init.completion_handler))));
        return init.result.get();
    }

    void wait_for_latest_osdmap(std::unique_ptr<OpCompletion> c)
    {
        monc->get_version("osdmap",
                          CB_Objecter_GetVersion(this, std::move(c)));
    }

    template <typename CompletionToken>
    auto get_latest_version(epoch_t oldest, epoch_t newest,
                            CompletionToken &&token)
    {
        boost::asio::async_completion<CompletionToken, OpSignature> init(token);
        {
            std::unique_lock wl(rwlock);
            _get_latest_version(oldest, newest,
                                OpCompletion::create(
                                    service.get_executor(),
                                    std::move(init.completion_handler)),
                                std::move(wl));
        }
        return init.result.get();
    }

    void _get_latest_version(epoch_t oldest, epoch_t neweset,
                             std::unique_ptr<OpCompletion> fin,
                             std::unique_lock<ceph::shared_mutex> &&ul);

    /** Get the current set of global op flags */
    int get_global_op_flags() const { return global_op_flags; }
    /** Add a flag to the global op flags, not really atomic operation */
    void add_global_op_flags(int flag)
    {
        global_op_flags.fetch_or(flag);
    }
    /** Clear the passed flags from the global op flag set */
    void clear_global_op_flag(int flags)
    {
        global_op_flags.fetch_and(~flags);
    }

    /// cancel an in-progress request with the given return code
private:
    int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
    int _op_cancel(ceph_tid_t tid, int r);

public:
    int op_cancel(ceph_tid_t tid, int r);
    int op_cancel(const std::vector<ceph_tid_t> &tidls, int r);

    /**
     * Any write op which is in progress at the start of this call shall no
     * longer be in progress when this call ends.  Operations started after the
     * start of this call may still be in progress when this call ends.
     *
     * @return the latest possible epoch in which a cancelled op could have
     *         existed, or -1 if nothing was cancelled.
     */
    epoch_t op_cancel_writes(int r, int64_t pool = -1);

    // commands
    void osd_command(int osd, std::vector<std::string> cmd,
                     ceph::buffer::list inbl, ceph_tid_t *ptid,
                     decltype(CommandOp::onfinish) &&onfinish)
    {
        ceph_assert(osd >= 0);
        auto c = new CommandOp(
            osd,
            std::move(cmd),
            std::move(inbl),
            std::move(onfinish));
        submit_command(c, ptid);
    }
    template <typename CompletionToken>
    auto osd_command(int osd, std::vector<std::string> cmd,
                     ceph::buffer::list inbl, ceph_tid_t *ptid,
                     CompletionToken &&token)
    {
        boost::asio::async_completion<CompletionToken,
                                      CommandOp::OpSig>
            init(token);
        osd_command(osd, std::move(cmd), std::move(inbl), ptid,
                    CommandOp::OpComp::create(service.get_executor(),
                                              std::move(init.completion_handler)));
        return init.result.get();
    }

    void pg_command(pg_t pgid, std::vector<std::string> cmd,
                    ceph::buffer::list inbl, ceph_tid_t *ptid,
                    decltype(CommandOp::onfinish) &&onfinish)
    {
        auto *c = new CommandOp(
            pgid,
            std::move(cmd),
            std::move(inbl),
            std::move(onfinish));
        submit_command(c, ptid);
    }

    template <typename CompletionToken>
    auto pg_command(pg_t pgid, std::vector<std::string> cmd,
                    ceph::buffer::list inbl, ceph_tid_t *ptid,
                    CompletionToken &&token)
    {
        boost::asio::async_completion<CompletionToken,
                                      CommandOp::OpSig>
            init(token);
        pg_command(pgid, std::move(cmd), std::move(inbl), ptid,
                   CommandOp::OpComp::create(service.get_executor(),
                                             std::move(init.completion_handler)));
        return init.result.get();
    }

    // mid-level helpers
    Op *prepare_mutate_op(
        const object_t &oid, const object_locator_t &oloc,
        ObjectOperation &op, const SnapContext &snapc,
        ceph::real_time mtime, int flags,
        Context *oncommit, version_t *objver = NULL,
        osd_reqid_t reqid = osd_reqid_t(),
        ZTracer::Trace *parent_trace = nullptr)
    {
        Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit, objver,
                       nullptr, parent_trace);
        o->priority = op.priority;
        o->mtime = mtime;
        o->snapc = snapc;
        o->out_rval.swap(op.out_rval);
        o->out_bl.swap(op.out_bl);
        o->out_handler.swap(op.out_handler);
        o->out_ec.swap(op.out_ec);
        o->reqid = reqid;
        op.clear();
        return o;
    }
    ceph_tid_t mutate(
        const object_t &oid, const object_locator_t &oloc,
        ObjectOperation &op, const SnapContext &snapc,
        ceph::real_time mtime, int flags,
        Context *oncommit, version_t *objver = NULL,
        osd_reqid_t reqid = osd_reqid_t())
    {
        Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags,
                                  oncommit, objver, reqid);
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }

    void mutate(const object_t &oid, const object_locator_t &oloc,
                ObjectOperation &&op, const SnapContext &snapc,
                ceph::real_time mtime, int flags,
                std::unique_ptr<Op::OpComp> &&oncommit,
                version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(),
                ZTracer::Trace *parent_trace = nullptr)
    {
        Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, std::move(oncommit), objver,
                       nullptr, parent_trace);
        o->priority = op.priority;
        o->mtime = mtime;
        o->snapc = snapc;
        o->out_bl.swap(op.out_bl);
        o->out_handler.swap(op.out_handler);
        o->out_rval.swap(op.out_rval);
        o->out_ec.swap(op.out_ec);
        o->reqid = reqid;
        op.clear();
        op_submit(o);
    }

    Op *prepare_read_op(
        const object_t &oid, const object_locator_t &oloc,
        ObjectOperation &op,
        snapid_t snapid, ceph::buffer::list *pbl, int flags,
        Context *onack, version_t *objver = NULL,
        int *data_offset = NULL,
        uint64_t features = 0,
        ZTracer::Trace *parent_trace = nullptr)
    {
        Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags | CEPH_OSD_FLAG_READ, onack, objver,
                       data_offset, parent_trace);
        o->priority = op.priority;
        o->snapid = snapid;
        o->outbl = pbl;
        if (!o->outbl && op.size() == 1 && op.out_bl[0] && op.out_bl[0]->length())
            o->outbl = op.out_bl[0];
        o->out_bl.swap(op.out_bl);
        o->out_handler.swap(op.out_handler);
        o->out_rval.swap(op.out_rval);
        o->out_ec.swap(op.out_ec);
        op.clear();
        return o;
    }
    ceph_tid_t read(
        const object_t &oid, const object_locator_t &oloc,
        ObjectOperation &op,
        snapid_t snapid, ceph::buffer::list *pbl, int flags,
        Context *onack, version_t *objver = NULL,
        int *data_offset = NULL,
        uint64_t features = 0)
    {
        Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver,
                                data_offset);
        if (features)
            o->features = features;
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }

    void read(const object_t &oid, const object_locator_t &oloc,
              ObjectOperation &&op, snapid_t snapid, ceph::buffer::list *pbl,
              int flags, std::unique_ptr<Op::OpComp> &&onack,
              version_t *objver = nullptr, int *data_offset = nullptr,
              uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr)
    {
        Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags | CEPH_OSD_FLAG_READ, std::move(onack), objver,
                       data_offset, parent_trace);
        o->priority = op.priority;
        o->snapid = snapid;
        o->outbl = pbl;
        // XXX
        if (!o->outbl && op.size() == 1 && op.out_bl[0] && op.out_bl[0]->length())
        {
            o->outbl = op.out_bl[0];
        }
        o->out_bl.swap(op.out_bl);
        o->out_handler.swap(op.out_handler);
        o->out_rval.swap(op.out_rval);
        o->out_ec.swap(op.out_ec);
        if (features)
            o->features = features;
        op.clear();
        op_submit(o);
    }

    Op *prepare_pg_read_op(
        uint32_t hash, object_locator_t oloc,
        ObjectOperation &op, ceph::buffer::list *pbl, int flags,
        Context *onack, epoch_t *reply_epoch,
        int *ctx_budget)
    {
        Op *o = new Op(object_t(), oloc,
                       std::move(op.ops),
                       flags | global_op_flags | CEPH_OSD_FLAG_READ |
                           CEPH_OSD_FLAG_IGNORE_OVERLAY,
                       onack, NULL);
        o->target.precalc_pgid = true;
        o->target.base_pgid = pg_t(hash, oloc.pool);
        o->priority = op.priority;
        o->snapid = CEPH_NOSNAP;
        o->outbl = pbl;
        o->out_bl.swap(op.out_bl);
        o->out_handler.swap(op.out_handler);
        o->out_rval.swap(op.out_rval);
        o->out_ec.swap(op.out_ec);
        o->reply_epoch = reply_epoch;
        if (ctx_budget)
        {
            // budget is tracked by listing context
            o->ctx_budgeted = true;
        }
        op.clear();
        return o;
    }
    ceph_tid_t pg_read(
        uint32_t hash, object_locator_t oloc,
        ObjectOperation &op, ceph::buffer::list *pbl, int flags,
        Context *onack, epoch_t *reply_epoch,
        int *ctx_budget)
    {
        Op *o = prepare_pg_read_op(hash, oloc, op, pbl, flags,
                                   onack, reply_epoch, ctx_budget);
        ceph_tid_t tid;
        op_submit(o, &tid, ctx_budget);
        return tid;
    }

    ceph_tid_t pg_read(
        uint32_t hash, object_locator_t oloc,
        ObjectOperation &op, ceph::buffer::list *pbl, int flags,
        std::unique_ptr<Op::OpComp> &&onack, epoch_t *reply_epoch, int *ctx_budget)
    {
        ceph_tid_t tid;
        Op *o = new Op(object_t(), oloc,
                       std::move(op.ops),
                       flags | global_op_flags | CEPH_OSD_FLAG_READ |
                           CEPH_OSD_FLAG_IGNORE_OVERLAY,
                       std::move(onack), nullptr);
        o->target.precalc_pgid = true;
        o->target.base_pgid = pg_t(hash, oloc.pool);
        o->priority = op.priority;
        o->snapid = CEPH_NOSNAP;
        o->outbl = pbl;
        o->out_bl.swap(op.out_bl);
        o->out_handler.swap(op.out_handler);
        o->out_rval.swap(op.out_rval);
        o->out_ec.swap(op.out_ec);
        o->reply_epoch = reply_epoch;
        if (ctx_budget)
        {
            // budget is tracked by listing context
            o->ctx_budgeted = true;
        }
        op_submit(o, &tid, ctx_budget);
        op.clear();
        return tid;
    }

    // caller owns a ref
    LingerOp *linger_register(const object_t &oid, const object_locator_t &oloc,
                              int flags);
    ceph_tid_t linger_watch(LingerOp *info,
                            ObjectOperation &op,
                            const SnapContext &snapc, ceph::real_time mtime,
                            ceph::buffer::list &inbl,
                            decltype(info->on_reg_commit) &&oncommit,
                            version_t *objver);
    ceph_tid_t linger_watch(LingerOp *info,
                            ObjectOperation &op,
                            const SnapContext &snapc, ceph::real_time mtime,
                            ceph::buffer::list &inbl,
                            Context *onfinish,
                            version_t *objver)
    {
        return linger_watch(info, op, snapc, mtime, inbl,
                            OpContextVert<ceph::buffer::list>(onfinish, nullptr), objver);
    }
    ceph_tid_t linger_notify(LingerOp *info,
                             ObjectOperation &op,
                             snapid_t snap, ceph::buffer::list &inbl,
                             decltype(LingerOp::on_reg_commit) &&onfinish,
                             version_t *objver);
    ceph_tid_t linger_notify(LingerOp *info,
                             ObjectOperation &op,
                             snapid_t snap, ceph::buffer::list &inbl,
                             ceph::buffer::list *poutbl,
                             Context *onack,
                             version_t *objver)
    {
        return linger_notify(info, op, snap, inbl,
                             OpContextVert(onack, poutbl),
                             objver);
    }
    tl::expected<ceph::timespan,
                 boost::system::error_code>
    linger_check(LingerOp *info);
    void linger_cancel(LingerOp *info); // releases a reference
    void _linger_cancel(LingerOp *info);

    void _do_watch_notify(boost::intrusive_ptr<LingerOp> info,
                          boost::intrusive_ptr<MWatchNotify> m);

    /**
     * set up initial ops in the op std::vector, and allocate a final op slot.
     *
     * The caller is responsible for filling in the final ops_count ops.
     *
     * @param ops op std::vector
     * @param ops_count number of final ops the caller will fill in
     * @param extra_ops pointer to [array of] initial op[s]
     * @return index of final op (for caller to fill in)
     */
    int init_ops(boost::container::small_vector_base<OSDOp> &ops, int ops_count,
                 ObjectOperation *extra_ops)
    {
        int i;
        int extra = 0;

        if (extra_ops)
            extra = extra_ops->ops.size();

        ops.resize(ops_count + extra);

        for (i = 0; i < extra; i++)
        {
            ops[i] = extra_ops->ops[i];
        }

        return i;
    }

    // high-level helpers
    Op *prepare_stat_op(
        const object_t &oid, const object_locator_t &oloc,
        snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
        int flags, Context *onfinish, version_t *objver = NULL,
        ObjectOperation *extra_ops = NULL)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_STAT;
        C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_READ, fin, objver);
        o->snapid = snap;
        o->outbl = &fin->bl;
        return o;
    }
    ceph_tid_t stat(
        const object_t &oid, const object_locator_t &oloc,
        snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
        int flags, Context *onfinish, version_t *objver = NULL,
        ObjectOperation *extra_ops = NULL)
    {
        Op *o = prepare_stat_op(oid, oloc, snap, psize, pmtime, flags,
                                onfinish, objver, extra_ops);
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }

    Op *prepare_read_op(
        const object_t &oid, const object_locator_t &oloc,
        uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
        int flags, Context *onfinish, version_t *objver = NULL,
        ObjectOperation *extra_ops = NULL, int op_flags = 0,
        ZTracer::Trace *parent_trace = nullptr)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_READ;
        ops[i].op.extent.offset = off;
        ops[i].op.extent.length = len;
        ops[i].op.extent.truncate_size = 0;
        ops[i].op.extent.truncate_seq = 0;
        ops[i].op.flags = op_flags;
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, objver,
                       nullptr, parent_trace);
        o->snapid = snap;
        o->outbl = pbl;
        return o;
    }
    ceph_tid_t read(
        const object_t &oid, const object_locator_t &oloc,
        uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
        int flags, Context *onfinish, version_t *objver = NULL,
        ObjectOperation *extra_ops = NULL, int op_flags = 0)
    {
        Op *o = prepare_read_op(oid, oloc, off, len, snap, pbl, flags,
                                onfinish, objver, extra_ops, op_flags);
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }

    Op *prepare_cmpext_op(
        const object_t &oid, const object_locator_t &oloc,
        uint64_t off, ceph::buffer::list &cmp_bl,
        snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
        ObjectOperation *extra_ops = NULL, int op_flags = 0)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_CMPEXT;
        ops[i].op.extent.offset = off;
        ops[i].op.extent.length = cmp_bl.length();
        ops[i].op.extent.truncate_size = 0;
        ops[i].op.extent.truncate_seq = 0;
        ops[i].indata = cmp_bl;
        ops[i].op.flags = op_flags;
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, objver);
        o->snapid = snap;
        return o;
    }

    ceph_tid_t cmpext(
        const object_t &oid, const object_locator_t &oloc,
        uint64_t off, ceph::buffer::list &cmp_bl,
        snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
        ObjectOperation *extra_ops = NULL, int op_flags = 0)
    {
        Op *o = prepare_cmpext_op(oid, oloc, off, cmp_bl, snap,
                                  flags, onfinish, objver, extra_ops, op_flags);
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }

    ceph_tid_t read_trunc(const object_t &oid, const object_locator_t &oloc,
                          uint64_t off, uint64_t len, snapid_t snap,
                          ceph::buffer::list *pbl, int flags, uint64_t trunc_size,
                          __u32 trunc_seq, Context *onfinish,
                          version_t *objver = NULL,
                          ObjectOperation *extra_ops = NULL, int op_flags = 0)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_READ;
        ops[i].op.extent.offset = off;
        ops[i].op.extent.length = len;
        ops[i].op.extent.truncate_size = trunc_size;
        ops[i].op.extent.truncate_seq = trunc_seq;
        ops[i].op.flags = op_flags;
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, objver);
        o->snapid = snap;
        o->outbl = pbl;
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }
    ceph_tid_t mapext(const object_t &oid, const object_locator_t &oloc,
                      uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
                      int flags, Context *onfinish, version_t *objver = NULL,
                      ObjectOperation *extra_ops = NULL)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_MAPEXT;
        ops[i].op.extent.offset = off;
        ops[i].op.extent.length = len;
        ops[i].op.extent.truncate_size = 0;
        ops[i].op.extent.truncate_seq = 0;
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, objver);
        o->snapid = snap;
        o->outbl = pbl;
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }
    ceph_tid_t getxattr(const object_t &oid, const object_locator_t &oloc,
                        const char *name, snapid_t snap, ceph::buffer::list *pbl, int flags,
                        Context *onfinish,
                        version_t *objver = NULL, ObjectOperation *extra_ops = NULL)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_GETXATTR;
        ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
        ops[i].op.xattr.value_len = 0;
        if (name)
            ops[i].indata.append(name, ops[i].op.xattr.name_len);
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, objver);
        o->snapid = snap;
        o->outbl = pbl;
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }

    ceph_tid_t getxattrs(const object_t &oid, const object_locator_t &oloc,
                         snapid_t snap, std::map<std::string, ceph::buffer::list> &attrset,
                         int flags, Context *onfinish, version_t *objver = NULL,
                         ObjectOperation *extra_ops = NULL)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
        C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_READ, fin, objver);
        o->snapid = snap;
        o->outbl = &fin->bl;
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }

    ceph_tid_t read_full(const object_t &oid, const object_locator_t &oloc,
                         snapid_t snap, ceph::buffer::list *pbl, int flags,
                         Context *onfinish, version_t *objver = NULL,
                         ObjectOperation *extra_ops = NULL)
    {
        return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops);
    }

    // writes
    ceph_tid_t _modify(const object_t &oid, const object_locator_t &oloc,
                       osdc_opvec &ops,
                       ceph::real_time mtime,
                       const SnapContext &snapc, int flags,
                       Context *oncommit,
                       version_t *objver = NULL)
    {
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit, objver);
        o->mtime = mtime;
        o->snapc = snapc;
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }
    Op *prepare_write_op(
        const object_t &oid, const object_locator_t &oloc,
        uint64_t off, uint64_t len, const SnapContext &snapc,
        const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
        Context *oncommit, version_t *objver = NULL,
        ObjectOperation *extra_ops = NULL, int op_flags = 0,
        ZTracer::Trace *parent_trace = nullptr)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_WRITE;
        ops[i].op.extent.offset = off;
        ops[i].op.extent.length = len;
        ops[i].op.extent.truncate_size = 0;
        ops[i].op.extent.truncate_seq = 0;
        ops[i].indata = bl;
        ops[i].op.flags = op_flags;
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, std::move(oncommit), objver,
                       nullptr, parent_trace);
        o->mtime = mtime;
        o->snapc = snapc;
        return o;
    }
    ceph_tid_t write(
        const object_t &oid, const object_locator_t &oloc,
        uint64_t off, uint64_t len, const SnapContext &snapc,
        const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
        Context *oncommit, version_t *objver = NULL,
        ObjectOperation *extra_ops = NULL, int op_flags = 0)
    {
        Op *o = prepare_write_op(oid, oloc, off, len, snapc, bl, mtime, flags,
                                 oncommit, objver, extra_ops, op_flags);
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }
    Op *prepare_append_op(
        const object_t &oid, const object_locator_t &oloc,
        uint64_t len, const SnapContext &snapc,
        const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
        Context *oncommit,
        version_t *objver = NULL,
        ObjectOperation *extra_ops = NULL)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_APPEND;
        ops[i].op.extent.offset = 0;
        ops[i].op.extent.length = len;
        ops[i].op.extent.truncate_size = 0;
        ops[i].op.extent.truncate_seq = 0;
        ops[i].indata = bl;
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit, objver);
        o->mtime = mtime;
        o->snapc = snapc;
        return o;
    }
    ceph_tid_t append(
        const object_t &oid, const object_locator_t &oloc,
        uint64_t len, const SnapContext &snapc,
        const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
        Context *oncommit,
        version_t *objver = NULL,
        ObjectOperation *extra_ops = NULL)
    {
        Op *o = prepare_append_op(oid, oloc, len, snapc, bl, mtime, flags,
                                  oncommit, objver, extra_ops);
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }
    ceph_tid_t write_trunc(const object_t &oid, const object_locator_t &oloc,
                           uint64_t off, uint64_t len, const SnapContext &snapc,
                           const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
                           uint64_t trunc_size, __u32 trunc_seq,
                           Context *oncommit,
                           version_t *objver = NULL,
                           ObjectOperation *extra_ops = NULL, int op_flags = 0)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_WRITE;
        ops[i].op.extent.offset = off;
        ops[i].op.extent.length = len;
        ops[i].op.extent.truncate_size = trunc_size;
        ops[i].op.extent.truncate_seq = trunc_seq;
        ops[i].indata = bl;
        ops[i].op.flags = op_flags;
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit, objver);
        o->mtime = mtime;
        o->snapc = snapc;
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }
    Op *prepare_write_full_op(
        const object_t &oid, const object_locator_t &oloc,
        const SnapContext &snapc, const ceph::buffer::list &bl,
        ceph::real_time mtime, int flags,
        Context *oncommit, version_t *objver = NULL,
        ObjectOperation *extra_ops = NULL, int op_flags = 0)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_WRITEFULL;
        ops[i].op.extent.offset = 0;
        ops[i].op.extent.length = bl.length();
        ops[i].indata = bl;
        ops[i].op.flags = op_flags;
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit, objver);
        o->mtime = mtime;
        o->snapc = snapc;
        return o;
    }
    ceph_tid_t write_full(
        const object_t &oid, const object_locator_t &oloc,
        const SnapContext &snapc, const ceph::buffer::list &bl,
        ceph::real_time mtime, int flags,
        Context *oncommit, version_t *objver = NULL,
        ObjectOperation *extra_ops = NULL, int op_flags = 0)
    {
        Op *o = prepare_write_full_op(oid, oloc, snapc, bl, mtime, flags,
                                      oncommit, objver, extra_ops, op_flags);
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }
    Op *prepare_writesame_op(
        const object_t &oid, const object_locator_t &oloc,
        uint64_t write_len, uint64_t off,
        const SnapContext &snapc, const ceph::buffer::list &bl,
        ceph::real_time mtime, int flags,
        Context *oncommit, version_t *objver = NULL,
        ObjectOperation *extra_ops = NULL, int op_flags = 0)
    {

        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_WRITESAME;
        ops[i].op.writesame.offset = off;
        ops[i].op.writesame.length = write_len;
        ops[i].op.writesame.data_length = bl.length();
        ops[i].indata = bl;
        ops[i].op.flags = op_flags;
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit, objver);
        o->mtime = mtime;
        o->snapc = snapc;
        return o;
    }
    ceph_tid_t writesame(
        const object_t &oid, const object_locator_t &oloc,
        uint64_t write_len, uint64_t off,
        const SnapContext &snapc, const ceph::buffer::list &bl,
        ceph::real_time mtime, int flags,
        Context *oncommit, version_t *objver = NULL,
        ObjectOperation *extra_ops = NULL, int op_flags = 0)
    {

        Op *o = prepare_writesame_op(oid, oloc, write_len, off, snapc, bl,
                                     mtime, flags, oncommit, objver,
                                     extra_ops, op_flags);

        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }
    ceph_tid_t trunc(const object_t &oid, const object_locator_t &oloc,
                     const SnapContext &snapc, ceph::real_time mtime, int flags,
                     uint64_t trunc_size, __u32 trunc_seq,
                     Context *oncommit, version_t *objver = NULL,
                     ObjectOperation *extra_ops = NULL)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_TRUNCATE;
        ops[i].op.extent.offset = trunc_size;
        ops[i].op.extent.truncate_size = trunc_size;
        ops[i].op.extent.truncate_seq = trunc_seq;
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit, objver);
        o->mtime = mtime;
        o->snapc = snapc;
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }
    ceph_tid_t zero(const object_t &oid, const object_locator_t &oloc,
                    uint64_t off, uint64_t len, const SnapContext &snapc,
                    ceph::real_time mtime, int flags, Context *oncommit,
                    version_t *objver = NULL, ObjectOperation *extra_ops = NULL)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_ZERO;
        ops[i].op.extent.offset = off;
        ops[i].op.extent.length = len;
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit, objver);
        o->mtime = mtime;
        o->snapc = snapc;
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }
    ceph_tid_t rollback_object(const object_t &oid, const object_locator_t &oloc,
                               const SnapContext &snapc, snapid_t snapid,
                               ceph::real_time mtime, Context *oncommit,
                               version_t *objver = NULL,
                               ObjectOperation *extra_ops = NULL)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_ROLLBACK;
        ops[i].op.snap.snapid = snapid;
        Op *o = new Op(oid, oloc, std::move(ops), CEPH_OSD_FLAG_WRITE, oncommit, objver);
        o->mtime = mtime;
        o->snapc = snapc;
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }
    ceph_tid_t create(const object_t &oid, const object_locator_t &oloc,
                      const SnapContext &snapc, ceph::real_time mtime, int global_flags,
                      int create_flags, Context *oncommit,
                      version_t *objver = NULL,
                      ObjectOperation *extra_ops = NULL)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_CREATE;
        ops[i].op.flags = create_flags;
        Op *o = new Op(oid, oloc, std::move(ops), global_flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit, objver);
        o->mtime = mtime;
        o->snapc = snapc;
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }
    Op *prepare_remove_op(
        const object_t &oid, const object_locator_t &oloc,
        const SnapContext &snapc, ceph::real_time mtime, int flags,
        Context *oncommit,
        version_t *objver = NULL, ObjectOperation *extra_ops = NULL)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_DELETE;
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit, objver);
        o->mtime = mtime;
        o->snapc = snapc;
        return o;
    }
    ceph_tid_t remove(
        const object_t &oid, const object_locator_t &oloc,
        const SnapContext &snapc, ceph::real_time mtime, int flags,
        Context *oncommit,
        version_t *objver = NULL, ObjectOperation *extra_ops = NULL)
    {
        Op *o = prepare_remove_op(oid, oloc, snapc, mtime, flags,
                                  oncommit, objver, extra_ops);
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }

    ceph_tid_t setxattr(const object_t &oid, const object_locator_t &oloc,
                        const char *name, const SnapContext &snapc, const ceph::buffer::list &bl,
                        ceph::real_time mtime, int flags,
                        Context *oncommit,
                        version_t *objver = NULL, ObjectOperation *extra_ops = NULL)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_SETXATTR;
        ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
        ops[i].op.xattr.value_len = bl.length();
        if (name)
            ops[i].indata.append(name, ops[i].op.xattr.name_len);
        ops[i].indata.append(bl);
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit,
                       objver);
        o->mtime = mtime;
        o->snapc = snapc;
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }
    ceph_tid_t removexattr(const object_t &oid, const object_locator_t &oloc,
                           const char *name, const SnapContext &snapc,
                           ceph::real_time mtime, int flags,
                           Context *oncommit,
                           version_t *objver = NULL, ObjectOperation *extra_ops = NULL)
    {
        osdc_opvec ops;
        int i = init_ops(ops, 1, extra_ops);
        ops[i].op.op = CEPH_OSD_OP_RMXATTR;
        ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
        ops[i].op.xattr.value_len = 0;
        if (name)
            ops[i].indata.append(name, ops[i].op.xattr.name_len);
        Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit, objver);
        o->mtime = mtime;
        o->snapc = snapc;
        ceph_tid_t tid;
        op_submit(o, &tid);
        return tid;
    }

    void list_nobjects(NListContext *p, Context *onfinish);
    uint32_t list_nobjects_seek(NListContext *p, uint32_t pos);
    uint32_t list_nobjects_seek(NListContext *list_context, const hobject_t &c);
    void list_nobjects_get_cursor(NListContext *list_context, hobject_t *c);

    hobject_t enumerate_objects_begin();
    hobject_t enumerate_objects_end();

    template <typename T>
    friend struct EnumerationContext;
    template <typename T>
    friend struct CB_EnumerateReply;
    template <typename T>
    void enumerate_objects(
        int64_t pool_id,
        std::string_view ns,
        hobject_t start,
        hobject_t end,
        const uint32_t max,
        const ceph::buffer::list &filter_bl,
        fu2::unique_function<void(boost::system::error_code,
                                  std::vector<T>,
                                  hobject_t) &&>
            on_finish);
    template <typename T>
    void _issue_enumerate(hobject_t start,
                          std::unique_ptr<EnumerationContext<T>>);
    template <typename T>
    void _enumerate_reply(
        ceph::buffer::list &&bl,
        boost::system::error_code ec,
        std::unique_ptr<EnumerationContext<T>> &&ectx);

    // -------------------------
    // pool ops
private:
    void pool_op_submit(PoolOp *op);
    void _pool_op_submit(PoolOp *op);
    void _finish_pool_op(PoolOp *op, int r);
    void _do_delete_pool(int64_t pool,
                         decltype(PoolOp::onfinish) &&onfinish);

public:
    void create_pool_snap(int64_t pool, std::string_view snapName,
                          decltype(PoolOp::onfinish) &&onfinish);
    void create_pool_snap(int64_t pool, std::string_view snapName,
                          Context *c)
    {
        create_pool_snap(pool, snapName,
                         OpContextVert<ceph::buffer::list>(c, nullptr));
    }
    void allocate_selfmanaged_snap(int64_t pool,
                                   std::unique_ptr<ceph::async::Completion<
                                       void(boost::system::error_code,
                                            snapid_t)>>
                                       onfinish);
    void allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
                                   Context *c)
    {
        allocate_selfmanaged_snap(pool,
                                  OpContextVert(c, psnapid));
    }
    void delete_pool_snap(int64_t pool, std::string_view snapName,
                          decltype(PoolOp::onfinish) &&onfinish);
    void delete_pool_snap(int64_t pool, std::string_view snapName,
                          Context *c)
    {
        delete_pool_snap(pool, snapName,
                         OpContextVert<ceph::buffer::list>(c, nullptr));
    }

    void delete_selfmanaged_snap(int64_t pool, snapid_t snap,
                                 decltype(PoolOp::onfinish) &&onfinish);
    void delete_selfmanaged_snap(int64_t pool, snapid_t snap,
                                 Context *c)
    {
        delete_selfmanaged_snap(pool, snap,
                                OpContextVert<ceph::buffer::list>(c, nullptr));
    }

    void create_pool(std::string_view name,
                     decltype(PoolOp::onfinish) &&onfinish,
                     int crush_rule = -1);
    void create_pool(std::string_view name, Context *onfinish,
                     int crush_rule = -1)
    {
        create_pool(name,
                    OpContextVert<ceph::buffer::list>(onfinish, nullptr),
                    crush_rule);
    }
    void delete_pool(int64_t pool,
                     decltype(PoolOp::onfinish) &&onfinish);
    void delete_pool(int64_t pool,
                     Context *onfinish)
    {
        delete_pool(pool, OpContextVert<ceph::buffer::list>(onfinish, nullptr));
    }

    void delete_pool(std::string_view name,
                     decltype(PoolOp::onfinish) &&onfinish);

    void delete_pool(std::string_view name,
                     Context *onfinish)
    {
        delete_pool(name, OpContextVert<ceph::buffer::list>(onfinish, nullptr));
    }

    void handle_pool_op_reply(MPoolOpReply *m);
    int pool_op_cancel(ceph_tid_t tid, int r);

    // --------------------------
    // pool stats
private:
    void _poolstat_submit(PoolStatOp *op);

public:
    void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
    void get_pool_stats(const std::vector<std::string> &pools,
                        decltype(PoolStatOp::onfinish) &&onfinish);
    template <typename CompletionToken>
    auto get_pool_stats(const std::vector<std::string> &pools,
                        CompletionToken &&token)
    {
        boost::asio::async_completion<CompletionToken,
                                      PoolStatOp::OpSig>
            init(token);
        get_pool_stats(pools,
                       PoolStatOp::OpComp::create(
                           service.get_executor(),
                           std::move(init.completion_handler)));
        return init.result.get();
    }
    int pool_stat_op_cancel(ceph_tid_t tid, int r);
    void _finish_pool_stat_op(PoolStatOp *op, int r);

    // ---------------------------
    // df stats
private:
    void _fs_stats_submit(StatfsOp *op);

public:
    void handle_fs_stats_reply(MStatfsReply *m);
    void get_fs_stats(boost::optional<int64_t> poolid,
                      decltype(StatfsOp::onfinish) &&onfinish);
    template <typename CompletionToken>
    auto get_fs_stats(boost::optional<int64_t> poolid,
                      CompletionToken &&token)
    {
        boost::asio::async_completion<CompletionToken, StatfsOp::OpSig> init(token);
        get_fs_stats(poolid,
                     StatfsOp::OpComp::create(service.get_executor(),
                                              std::move(init.completion_handler)));
        return init.result.get();
    }
    void get_fs_stats(struct ceph_statfs &result, boost::optional<int64_t> poolid,
                      Context *onfinish)
    {
        get_fs_stats(poolid, OpContextVert(onfinish, result));
    }
    int statfs_op_cancel(ceph_tid_t tid, int r);
    void _finish_statfs_op(StatfsOp *op, int r);

    // ---------------------------
    // some scatter/gather hackery

    void _sg_read_finish(std::vector<ObjectExtent> &extents,
                         std::vector<ceph::buffer::list> &resultbl,
                         ceph::buffer::list *bl, Context *onfinish);

    struct C_SGRead : public Context
    {
        Objecter *objecter;
        std::vector<ObjectExtent> extents;
        std::vector<ceph::buffer::list> resultbl;
        ceph::buffer::list *bl;
        Context *onfinish;
        C_SGRead(Objecter *ob,
                 std::vector<ObjectExtent> &e, std::vector<ceph::buffer::list> &r, ceph::buffer::list *b,
                 Context *c) : objecter(ob), bl(b), onfinish(c)
        {
            extents.swap(e);
            resultbl.swap(r);
        }
        void finish(int r) override
        {
            objecter->_sg_read_finish(extents, resultbl, bl, onfinish);
        }
    };

    void sg_read_trunc(std::vector<ObjectExtent> &extents, snapid_t snap,
                       ceph::buffer::list *bl, int flags, uint64_t trunc_size,
                       __u32 trunc_seq, Context *onfinish, int op_flags = 0)
    {
        if (extents.size() == 1)
        {
            read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
                       extents[0].length, snap, bl, flags, extents[0].truncate_size,
                       trunc_seq, onfinish, 0, 0, op_flags);
        }
        else
        {
            C_GatherBuilder gather(cct);
            std::vector<ceph::buffer::list> resultbl(extents.size());
            int i = 0;
            for (auto p = extents.begin(); p != extents.end(); ++p)
            {
                read_trunc(p->oid, p->oloc, p->offset, p->length, snap, &resultbl[i++],
                           flags, p->truncate_size, trunc_seq, gather.new_sub(),
                           0, 0, op_flags);
            }
            gather.set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
            gather.activate();
        }
    }

    void sg_read(std::vector<ObjectExtent> &extents, snapid_t snap, ceph::buffer::list *bl,
                 int flags, Context *onfinish, int op_flags = 0)
    {
        sg_read_trunc(extents, snap, bl, flags, 0, 0, onfinish, op_flags);
    }

    void sg_write_trunc(std::vector<ObjectExtent> &extents, const SnapContext &snapc,
                        const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
                        uint64_t trunc_size, __u32 trunc_seq,
                        Context *oncommit, int op_flags = 0)
    {
        if (extents.size() == 1)
        {
            write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
                        extents[0].length, snapc, bl, mtime, flags,
                        extents[0].truncate_size, trunc_seq, oncommit,
                        0, 0, op_flags);
        }
        else
        {
            C_GatherBuilder gcom(cct, oncommit);
            auto it = bl.cbegin();
            for (auto p = extents.begin(); p != extents.end(); ++p)
            {
                ceph::buffer::list cur;
                for (auto bit = p->buffer_extents.begin();
                     bit != p->buffer_extents.end();
                     ++bit)
                {
                    if (it.get_off() != bit->first)
                    {
                        it.seek(bit->first);
                    }
                    it.copy(bit->second, cur);
                }
                ceph_assert(cur.length() == p->length);
                write_trunc(p->oid, p->oloc, p->offset, p->length,
                            snapc, cur, mtime, flags, p->truncate_size, trunc_seq,
                            oncommit ? gcom.new_sub() : 0,
                            0, 0, op_flags);
            }
            gcom.activate();
        }
    }

    void sg_write(std::vector<ObjectExtent> &extents, const SnapContext &snapc,
                  const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
                  Context *oncommit, int op_flags = 0)
    {
        sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, oncommit,
                       op_flags);
    }

    void ms_handle_connect(Connection *con) override;
    bool ms_handle_reset(Connection *con) override;
    void ms_handle_remote_reset(Connection *con) override;
    bool ms_handle_refused(Connection *con) override;

    void blocklist_self(bool set);

private:
    epoch_t epoch_barrier = 0;
    bool retry_writes_after_first_reply =
        cct->_conf->objecter_retry_writes_after_first_reply;

public:
    void set_epoch_barrier(epoch_t epoch);

    PerfCounters *get_logger()
    {
        return logger;
    }
};

#endif
